meta_srv/handler/
region_lease_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
19use async_trait::async_trait;
20use common_meta::key::TableMetadataManagerRef;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use store_api::region_engine::GrantedRegion;
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
27use crate::metasrv::Context;
28use crate::region::lease_keeper::{
29    RegionLeaseInfo, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
30};
31use crate::region::RegionLeaseKeeper;
32
33pub struct RegionLeaseHandler {
34    region_lease_seconds: u64,
35    region_lease_keeper: RegionLeaseKeeperRef,
36    customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
37}
38
39pub type CustomizedRegionLeaseRenewerRef = Arc<dyn CustomizedRegionLeaseRenewer>;
40
41pub trait CustomizedRegionLeaseRenewer: Send + Sync {
42    fn renew(
43        &self,
44        ctx: &mut Context,
45        regions: HashMap<RegionId, RegionLeaseInfo>,
46    ) -> Vec<GrantedRegion>;
47}
48
49impl RegionLeaseHandler {
50    pub fn new(
51        region_lease_seconds: u64,
52        table_metadata_manager: TableMetadataManagerRef,
53        memory_region_keeper: MemoryRegionKeeperRef,
54        customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
55    ) -> Self {
56        let region_lease_keeper =
57            RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());
58
59        Self {
60            region_lease_seconds,
61            region_lease_keeper: Arc::new(region_lease_keeper),
62            customized_region_lease_renewer,
63        }
64    }
65}
66
67#[async_trait]
68impl HeartbeatHandler for RegionLeaseHandler {
69    fn is_acceptable(&self, role: Role) -> bool {
70        role == Role::Datanode
71    }
72
73    async fn handle(
74        &self,
75        req: &HeartbeatRequest,
76        ctx: &mut Context,
77        acc: &mut HeartbeatAccumulator,
78    ) -> Result<HandleControl> {
79        let Some(stat) = acc.stat.as_ref() else {
80            return Ok(HandleControl::Continue);
81        };
82
83        let regions = stat.regions();
84        let datanode_id = stat.id;
85
86        let RenewRegionLeasesResponse {
87            non_exists,
88            renewed,
89        } = self
90            .region_lease_keeper
91            .renew_region_leases(datanode_id, &regions)
92            .await?;
93
94        let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
95            renewer
96                .renew(ctx, renewed)
97                .into_iter()
98                .map(|region| region.into())
99                .collect()
100        } else {
101            renewed
102                .into_iter()
103                .map(|(region_id, region_lease_info)| {
104                    GrantedRegion::new(region_id, region_lease_info.role).into()
105                })
106                .collect::<Vec<_>>()
107        };
108
109        acc.region_lease = Some(RegionLease {
110            regions: renewed,
111            duration_since_epoch: req.duration_since_epoch,
112            lease_seconds: self.region_lease_seconds,
113            closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
114        });
115        acc.inactive_region_ids = non_exists;
116
117        Ok(HandleControl::Continue)
118    }
119}
120
121#[cfg(test)]
122mod test {
123    use std::collections::{HashMap, HashSet};
124    use std::sync::Arc;
125
126    use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
127    use common_meta::distributed_time_constants;
128    use common_meta::key::table_route::TableRouteValue;
129    use common_meta::key::test_utils::new_test_table_info;
130    use common_meta::key::TableMetadataManager;
131    use common_meta::kv_backend::memory::MemoryKvBackend;
132    use common_meta::peer::Peer;
133    use common_meta::region_keeper::MemoryRegionKeeper;
134    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
135    use store_api::region_engine::RegionRole;
136    use store_api::storage::RegionId;
137
138    use super::*;
139    use crate::metasrv::builder::MetasrvBuilder;
140
141    fn new_test_keeper() -> RegionLeaseKeeper {
142        let store = Arc::new(MemoryKvBackend::new());
143
144        let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
145
146        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
147        RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper)
148    }
149
150    fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
151        RegionStat {
152            id: region_id,
153            role,
154            rcus: 0,
155            wcus: 0,
156            approximate_bytes: 0,
157            engine: String::new(),
158            num_rows: 0,
159            memtable_size: 0,
160            manifest_size: 0,
161            sst_size: 0,
162            sst_num: 0,
163            index_size: 0,
164            region_manifest: RegionManifestInfo::Mito {
165                manifest_version: 0,
166                flushed_entry_id: 0,
167            },
168            data_topic_latest_entry_id: 0,
169            metadata_topic_latest_entry_id: 0,
170            written_bytes: 0,
171        }
172    }
173
174    #[tokio::test]
175    async fn test_handle_upgradable_follower() {
176        let datanode_id = 1;
177        let region_number = 1u32;
178        let table_id = 10;
179        let region_id = RegionId::new(table_id, region_number);
180        let another_region_id = RegionId::new(table_id, region_number + 1);
181        let peer = Peer::empty(datanode_id);
182        let follower_peer = Peer::empty(datanode_id + 1);
183        let table_info = new_test_table_info(table_id, vec![region_number]).into();
184
185        let region_routes = vec![RegionRoute {
186            region: Region::new_test(region_id),
187            leader_peer: Some(peer.clone()),
188            follower_peers: vec![follower_peer.clone()],
189            ..Default::default()
190        }];
191
192        let keeper = new_test_keeper();
193        let table_metadata_manager = keeper.table_metadata_manager();
194
195        table_metadata_manager
196            .create_table_metadata(
197                table_info,
198                TableRouteValue::physical(region_routes),
199                HashMap::default(),
200            )
201            .await
202            .unwrap();
203
204        let builder = MetasrvBuilder::new();
205        let metasrv = builder.build().await.unwrap();
206        let ctx = &mut metasrv.new_ctx();
207
208        let acc = &mut HeartbeatAccumulator::default();
209
210        acc.stat = Some(Stat {
211            id: peer.id,
212            region_stats: vec![
213                new_empty_region_stat(region_id, RegionRole::Follower),
214                new_empty_region_stat(another_region_id, RegionRole::Follower),
215            ],
216            ..Default::default()
217        });
218
219        let req = HeartbeatRequest {
220            duration_since_epoch: 1234,
221            ..Default::default()
222        };
223
224        let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
225
226        let handler = RegionLeaseHandler::new(
227            distributed_time_constants::REGION_LEASE_SECS,
228            table_metadata_manager.clone(),
229            opening_region_keeper.clone(),
230            None,
231        );
232
233        handler.handle(&req, ctx, acc).await.unwrap();
234
235        assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
236        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
237        assert_eq!(
238            acc.region_lease.as_ref().unwrap().closeable_region_ids,
239            vec![another_region_id]
240        );
241
242        let acc = &mut HeartbeatAccumulator::default();
243
244        acc.stat = Some(Stat {
245            id: follower_peer.id,
246            region_stats: vec![
247                new_empty_region_stat(region_id, RegionRole::Follower),
248                new_empty_region_stat(another_region_id, RegionRole::Follower),
249            ],
250            ..Default::default()
251        });
252
253        handler.handle(&req, ctx, acc).await.unwrap();
254
255        assert_eq!(
256            acc.region_lease.as_ref().unwrap().lease_seconds,
257            distributed_time_constants::REGION_LEASE_SECS
258        );
259
260        assert_region_lease(
261            acc,
262            vec![GrantedRegion::new(region_id, RegionRole::Follower)],
263        );
264        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
265        assert_eq!(
266            acc.region_lease.as_ref().unwrap().closeable_region_ids,
267            vec![another_region_id]
268        );
269
270        let opening_region_id = RegionId::new(table_id, region_number + 2);
271        let _guard = opening_region_keeper
272            .register(follower_peer.id, opening_region_id)
273            .unwrap();
274
275        let acc = &mut HeartbeatAccumulator::default();
276
277        acc.stat = Some(Stat {
278            id: follower_peer.id,
279            region_stats: vec![
280                new_empty_region_stat(region_id, RegionRole::Follower),
281                new_empty_region_stat(another_region_id, RegionRole::Follower),
282                new_empty_region_stat(opening_region_id, RegionRole::Follower),
283            ],
284            ..Default::default()
285        });
286
287        handler.handle(&req, ctx, acc).await.unwrap();
288
289        assert_eq!(
290            acc.region_lease.as_ref().unwrap().lease_seconds,
291            distributed_time_constants::REGION_LEASE_SECS
292        );
293
294        assert_region_lease(
295            acc,
296            vec![
297                GrantedRegion::new(region_id, RegionRole::Follower),
298                GrantedRegion::new(opening_region_id, RegionRole::Follower),
299            ],
300        );
301        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
302        assert_eq!(
303            acc.region_lease.as_ref().unwrap().closeable_region_ids,
304            vec![another_region_id]
305        );
306    }
307
308    #[tokio::test]
309
310    async fn test_handle_downgradable_leader() {
311        let datanode_id = 1;
312        let region_number = 1u32;
313        let table_id = 10;
314        let region_id = RegionId::new(table_id, region_number);
315        let another_region_id = RegionId::new(table_id, region_number + 1);
316        let no_exist_region_id = RegionId::new(table_id, region_number + 2);
317        let peer = Peer::empty(datanode_id);
318        let follower_peer = Peer::empty(datanode_id + 1);
319        let table_info = new_test_table_info(table_id, vec![region_number]).into();
320
321        let region_routes = vec![
322            RegionRoute {
323                region: Region::new_test(region_id),
324                leader_peer: Some(peer.clone()),
325                follower_peers: vec![follower_peer.clone()],
326                leader_state: Some(LeaderState::Downgrading),
327                leader_down_since: Some(1),
328            },
329            RegionRoute {
330                region: Region::new_test(another_region_id),
331                leader_peer: Some(peer.clone()),
332                ..Default::default()
333            },
334        ];
335
336        let keeper = new_test_keeper();
337        let table_metadata_manager = keeper.table_metadata_manager();
338
339        table_metadata_manager
340            .create_table_metadata(
341                table_info,
342                TableRouteValue::physical(region_routes),
343                HashMap::default(),
344            )
345            .await
346            .unwrap();
347
348        let builder = MetasrvBuilder::new();
349        let metasrv = builder.build().await.unwrap();
350        let ctx = &mut metasrv.new_ctx();
351
352        let req = HeartbeatRequest {
353            duration_since_epoch: 1234,
354            ..Default::default()
355        };
356
357        let acc = &mut HeartbeatAccumulator::default();
358
359        acc.stat = Some(Stat {
360            id: peer.id,
361            region_stats: vec![
362                new_empty_region_stat(region_id, RegionRole::Leader),
363                new_empty_region_stat(another_region_id, RegionRole::Leader),
364                new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
365            ],
366            ..Default::default()
367        });
368
369        let handler = RegionLeaseHandler::new(
370            distributed_time_constants::REGION_LEASE_SECS,
371            table_metadata_manager.clone(),
372            Default::default(),
373            None,
374        );
375
376        handler.handle(&req, ctx, acc).await.unwrap();
377
378        assert_region_lease(
379            acc,
380            vec![
381                GrantedRegion::new(region_id, RegionRole::DowngradingLeader),
382                GrantedRegion::new(another_region_id, RegionRole::Leader),
383            ],
384        );
385        assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
386    }
387
388    fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
389        let region_lease = acc.region_lease.as_ref().unwrap().clone();
390        let granted: Vec<GrantedRegion> = region_lease
391            .regions
392            .into_iter()
393            .map(Into::into)
394            .collect::<Vec<_>>();
395
396        let granted = granted
397            .into_iter()
398            .map(|region| (region.region_id, region))
399            .collect::<HashMap<_, _>>();
400
401        let expected = expected
402            .into_iter()
403            .map(|region| (region.region_id, region))
404            .collect::<HashMap<_, _>>();
405
406        assert_eq!(granted, expected);
407    }
408}