meta_srv/handler/
region_lease_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
19use async_trait::async_trait;
20use common_meta::key::TableMetadataManagerRef;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use store_api::region_engine::GrantedRegion;
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
27use crate::metasrv::Context;
28use crate::region::lease_keeper::{
29    RegionLeaseInfo, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
30};
31use crate::region::RegionLeaseKeeper;
32
33pub struct RegionLeaseHandler {
34    region_lease_seconds: u64,
35    region_lease_keeper: RegionLeaseKeeperRef,
36    customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
37}
38
39pub type CustomizedRegionLeaseRenewerRef = Arc<dyn CustomizedRegionLeaseRenewer>;
40
41pub trait CustomizedRegionLeaseRenewer: Send + Sync {
42    fn renew(
43        &self,
44        ctx: &mut Context,
45        regions: HashMap<RegionId, RegionLeaseInfo>,
46    ) -> Vec<GrantedRegion>;
47}
48
49impl RegionLeaseHandler {
50    pub fn new(
51        region_lease_seconds: u64,
52        table_metadata_manager: TableMetadataManagerRef,
53        memory_region_keeper: MemoryRegionKeeperRef,
54        customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
55    ) -> Self {
56        let region_lease_keeper =
57            RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());
58
59        Self {
60            region_lease_seconds,
61            region_lease_keeper: Arc::new(region_lease_keeper),
62            customized_region_lease_renewer,
63        }
64    }
65}
66
67#[async_trait]
68impl HeartbeatHandler for RegionLeaseHandler {
69    fn is_acceptable(&self, role: Role) -> bool {
70        role == Role::Datanode
71    }
72
73    async fn handle(
74        &self,
75        req: &HeartbeatRequest,
76        ctx: &mut Context,
77        acc: &mut HeartbeatAccumulator,
78    ) -> Result<HandleControl> {
79        let Some(stat) = acc.stat.as_ref() else {
80            return Ok(HandleControl::Continue);
81        };
82
83        let regions = stat.regions();
84        let datanode_id = stat.id;
85
86        let RenewRegionLeasesResponse {
87            non_exists,
88            renewed,
89        } = self
90            .region_lease_keeper
91            .renew_region_leases(datanode_id, &regions)
92            .await?;
93
94        let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
95            renewer
96                .renew(ctx, renewed)
97                .into_iter()
98                .map(|region| region.into())
99                .collect()
100        } else {
101            renewed
102                .into_iter()
103                .map(|(region_id, region_lease_info)| {
104                    GrantedRegion::new(region_id, region_lease_info.role).into()
105                })
106                .collect::<Vec<_>>()
107        };
108
109        acc.region_lease = Some(RegionLease {
110            regions: renewed,
111            duration_since_epoch: req.duration_since_epoch,
112            lease_seconds: self.region_lease_seconds,
113            closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
114        });
115        acc.inactive_region_ids = non_exists;
116
117        Ok(HandleControl::Continue)
118    }
119}
120
121#[cfg(test)]
122mod test {
123    use std::collections::{HashMap, HashSet};
124    use std::sync::Arc;
125
126    use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
127    use common_meta::distributed_time_constants;
128    use common_meta::key::table_route::TableRouteValue;
129    use common_meta::key::test_utils::new_test_table_info;
130    use common_meta::key::TableMetadataManager;
131    use common_meta::kv_backend::memory::MemoryKvBackend;
132    use common_meta::peer::Peer;
133    use common_meta::region_keeper::MemoryRegionKeeper;
134    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
135    use store_api::region_engine::RegionRole;
136    use store_api::storage::RegionId;
137
138    use super::*;
139    use crate::metasrv::builder::MetasrvBuilder;
140
141    fn new_test_keeper() -> RegionLeaseKeeper {
142        let store = Arc::new(MemoryKvBackend::new());
143
144        let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
145
146        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
147        RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper)
148    }
149
150    fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
151        RegionStat {
152            id: region_id,
153            role,
154            rcus: 0,
155            wcus: 0,
156            approximate_bytes: 0,
157            engine: String::new(),
158            num_rows: 0,
159            memtable_size: 0,
160            manifest_size: 0,
161            sst_size: 0,
162            index_size: 0,
163            region_manifest: RegionManifestInfo::Mito {
164                manifest_version: 0,
165                flushed_entry_id: 0,
166            },
167            data_topic_latest_entry_id: 0,
168            metadata_topic_latest_entry_id: 0,
169        }
170    }
171
172    #[tokio::test]
173    async fn test_handle_upgradable_follower() {
174        let datanode_id = 1;
175        let region_number = 1u32;
176        let table_id = 10;
177        let region_id = RegionId::new(table_id, region_number);
178        let another_region_id = RegionId::new(table_id, region_number + 1);
179        let peer = Peer::empty(datanode_id);
180        let follower_peer = Peer::empty(datanode_id + 1);
181        let table_info = new_test_table_info(table_id, vec![region_number]).into();
182
183        let region_routes = vec![RegionRoute {
184            region: Region::new_test(region_id),
185            leader_peer: Some(peer.clone()),
186            follower_peers: vec![follower_peer.clone()],
187            ..Default::default()
188        }];
189
190        let keeper = new_test_keeper();
191        let table_metadata_manager = keeper.table_metadata_manager();
192
193        table_metadata_manager
194            .create_table_metadata(
195                table_info,
196                TableRouteValue::physical(region_routes),
197                HashMap::default(),
198            )
199            .await
200            .unwrap();
201
202        let builder = MetasrvBuilder::new();
203        let metasrv = builder.build().await.unwrap();
204        let ctx = &mut metasrv.new_ctx();
205
206        let acc = &mut HeartbeatAccumulator::default();
207
208        acc.stat = Some(Stat {
209            id: peer.id,
210            region_stats: vec![
211                new_empty_region_stat(region_id, RegionRole::Follower),
212                new_empty_region_stat(another_region_id, RegionRole::Follower),
213            ],
214            ..Default::default()
215        });
216
217        let req = HeartbeatRequest {
218            duration_since_epoch: 1234,
219            ..Default::default()
220        };
221
222        let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
223
224        let handler = RegionLeaseHandler::new(
225            distributed_time_constants::REGION_LEASE_SECS,
226            table_metadata_manager.clone(),
227            opening_region_keeper.clone(),
228            None,
229        );
230
231        handler.handle(&req, ctx, acc).await.unwrap();
232
233        assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
234        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
235        assert_eq!(
236            acc.region_lease.as_ref().unwrap().closeable_region_ids,
237            vec![another_region_id]
238        );
239
240        let acc = &mut HeartbeatAccumulator::default();
241
242        acc.stat = Some(Stat {
243            id: follower_peer.id,
244            region_stats: vec![
245                new_empty_region_stat(region_id, RegionRole::Follower),
246                new_empty_region_stat(another_region_id, RegionRole::Follower),
247            ],
248            ..Default::default()
249        });
250
251        handler.handle(&req, ctx, acc).await.unwrap();
252
253        assert_eq!(
254            acc.region_lease.as_ref().unwrap().lease_seconds,
255            distributed_time_constants::REGION_LEASE_SECS
256        );
257
258        assert_region_lease(
259            acc,
260            vec![GrantedRegion::new(region_id, RegionRole::Follower)],
261        );
262        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
263        assert_eq!(
264            acc.region_lease.as_ref().unwrap().closeable_region_ids,
265            vec![another_region_id]
266        );
267
268        let opening_region_id = RegionId::new(table_id, region_number + 2);
269        let _guard = opening_region_keeper
270            .register(follower_peer.id, opening_region_id)
271            .unwrap();
272
273        let acc = &mut HeartbeatAccumulator::default();
274
275        acc.stat = Some(Stat {
276            id: follower_peer.id,
277            region_stats: vec![
278                new_empty_region_stat(region_id, RegionRole::Follower),
279                new_empty_region_stat(another_region_id, RegionRole::Follower),
280                new_empty_region_stat(opening_region_id, RegionRole::Follower),
281            ],
282            ..Default::default()
283        });
284
285        handler.handle(&req, ctx, acc).await.unwrap();
286
287        assert_eq!(
288            acc.region_lease.as_ref().unwrap().lease_seconds,
289            distributed_time_constants::REGION_LEASE_SECS
290        );
291
292        assert_region_lease(
293            acc,
294            vec![
295                GrantedRegion::new(region_id, RegionRole::Follower),
296                GrantedRegion::new(opening_region_id, RegionRole::Follower),
297            ],
298        );
299        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
300        assert_eq!(
301            acc.region_lease.as_ref().unwrap().closeable_region_ids,
302            vec![another_region_id]
303        );
304    }
305
306    #[tokio::test]
307
308    async fn test_handle_downgradable_leader() {
309        let datanode_id = 1;
310        let region_number = 1u32;
311        let table_id = 10;
312        let region_id = RegionId::new(table_id, region_number);
313        let another_region_id = RegionId::new(table_id, region_number + 1);
314        let no_exist_region_id = RegionId::new(table_id, region_number + 2);
315        let peer = Peer::empty(datanode_id);
316        let follower_peer = Peer::empty(datanode_id + 1);
317        let table_info = new_test_table_info(table_id, vec![region_number]).into();
318
319        let region_routes = vec![
320            RegionRoute {
321                region: Region::new_test(region_id),
322                leader_peer: Some(peer.clone()),
323                follower_peers: vec![follower_peer.clone()],
324                leader_state: Some(LeaderState::Downgrading),
325                leader_down_since: Some(1),
326            },
327            RegionRoute {
328                region: Region::new_test(another_region_id),
329                leader_peer: Some(peer.clone()),
330                ..Default::default()
331            },
332        ];
333
334        let keeper = new_test_keeper();
335        let table_metadata_manager = keeper.table_metadata_manager();
336
337        table_metadata_manager
338            .create_table_metadata(
339                table_info,
340                TableRouteValue::physical(region_routes),
341                HashMap::default(),
342            )
343            .await
344            .unwrap();
345
346        let builder = MetasrvBuilder::new();
347        let metasrv = builder.build().await.unwrap();
348        let ctx = &mut metasrv.new_ctx();
349
350        let req = HeartbeatRequest {
351            duration_since_epoch: 1234,
352            ..Default::default()
353        };
354
355        let acc = &mut HeartbeatAccumulator::default();
356
357        acc.stat = Some(Stat {
358            id: peer.id,
359            region_stats: vec![
360                new_empty_region_stat(region_id, RegionRole::Leader),
361                new_empty_region_stat(another_region_id, RegionRole::Leader),
362                new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
363            ],
364            ..Default::default()
365        });
366
367        let handler = RegionLeaseHandler::new(
368            distributed_time_constants::REGION_LEASE_SECS,
369            table_metadata_manager.clone(),
370            Default::default(),
371            None,
372        );
373
374        handler.handle(&req, ctx, acc).await.unwrap();
375
376        assert_region_lease(
377            acc,
378            vec![
379                GrantedRegion::new(region_id, RegionRole::DowngradingLeader),
380                GrantedRegion::new(another_region_id, RegionRole::Leader),
381            ],
382        );
383        assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
384    }
385
386    fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
387        let region_lease = acc.region_lease.as_ref().unwrap().clone();
388        let granted: Vec<GrantedRegion> = region_lease
389            .regions
390            .into_iter()
391            .map(Into::into)
392            .collect::<Vec<_>>();
393
394        let granted = granted
395            .into_iter()
396            .map(|region| (region.region_id, region))
397            .collect::<HashMap<_, _>>();
398
399        let expected = expected
400            .into_iter()
401            .map(|region| (region.region_id, region))
402            .collect::<HashMap<_, _>>();
403
404        assert_eq!(granted, expected);
405    }
406}