meta_srv/handler/
region_lease_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
19use async_trait::async_trait;
20use common_meta::key::TableMetadataManagerRef;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use common_telemetry::error;
23use store_api::region_engine::GrantedRegion;
24use store_api::storage::RegionId;
25
26use crate::error::Result;
27use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
28use crate::metasrv::Context;
29use crate::region::RegionLeaseKeeper;
30use crate::region::lease_keeper::{
31    RegionLeaseInfo, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
32};
33
34pub struct RegionLeaseHandler {
35    region_lease_seconds: u64,
36    region_lease_keeper: RegionLeaseKeeperRef,
37    customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
38}
39
40pub type CustomizedRegionLeaseRenewerRef = Arc<dyn CustomizedRegionLeaseRenewer>;
41
42pub trait CustomizedRegionLeaseRenewer: Send + Sync {
43    fn renew(
44        &self,
45        ctx: &mut Context,
46        regions: HashMap<RegionId, RegionLeaseInfo>,
47    ) -> Vec<GrantedRegion>;
48}
49
50impl RegionLeaseHandler {
51    pub fn new(
52        region_lease_seconds: u64,
53        table_metadata_manager: TableMetadataManagerRef,
54        memory_region_keeper: MemoryRegionKeeperRef,
55        customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
56    ) -> Self {
57        let region_lease_keeper =
58            RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());
59
60        Self {
61            region_lease_seconds,
62            region_lease_keeper: Arc::new(region_lease_keeper),
63            customized_region_lease_renewer,
64        }
65    }
66}
67
68#[async_trait]
69impl HeartbeatHandler for RegionLeaseHandler {
70    fn is_acceptable(&self, role: Role) -> bool {
71        role == Role::Datanode
72    }
73
74    async fn handle(
75        &self,
76        req: &HeartbeatRequest,
77        ctx: &mut Context,
78        acc: &mut HeartbeatAccumulator,
79    ) -> Result<HandleControl> {
80        let Some(stat) = acc.stat.as_ref() else {
81            return Ok(HandleControl::Continue);
82        };
83
84        let regions = stat.regions();
85        let datanode_id = stat.id;
86
87        match self
88            .region_lease_keeper
89            .renew_region_leases(datanode_id, &regions)
90            .await
91        {
92            Ok(RenewRegionLeasesResponse {
93                non_exists,
94                renewed,
95            }) => {
96                let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
97                    renewer
98                        .renew(ctx, renewed)
99                        .into_iter()
100                        .map(|region| region.into())
101                        .collect()
102                } else {
103                    renewed
104                        .into_iter()
105                        .map(|(region_id, region_lease_info)| {
106                            GrantedRegion::new(region_id, region_lease_info.role).into()
107                        })
108                        .collect::<Vec<_>>()
109                };
110
111                acc.region_lease = Some(RegionLease {
112                    regions: renewed,
113                    duration_since_epoch: req.duration_since_epoch,
114                    lease_seconds: self.region_lease_seconds,
115                    closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
116                });
117                acc.inactive_region_ids = non_exists;
118            }
119            Err(e) => {
120                error!(e; "Failed to renew region leases for datanode: {datanode_id:?}, regions: {:?}", regions);
121                // If we throw error here, the datanode will be marked as failure by region failure handler.
122                // So we only log the error and continue.
123            }
124        }
125
126        Ok(HandleControl::Continue)
127    }
128}
129
130#[cfg(test)]
131mod test {
132    use std::collections::{HashMap, HashSet};
133    use std::sync::Arc;
134
135    use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
136    use common_meta::distributed_time_constants;
137    use common_meta::key::TableMetadataManager;
138    use common_meta::key::table_route::TableRouteValue;
139    use common_meta::key::test_utils::new_test_table_info;
140    use common_meta::kv_backend::memory::MemoryKvBackend;
141    use common_meta::peer::Peer;
142    use common_meta::region_keeper::MemoryRegionKeeper;
143    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
144    use store_api::region_engine::RegionRole;
145    use store_api::storage::RegionId;
146
147    use super::*;
148    use crate::metasrv::builder::MetasrvBuilder;
149
150    fn new_test_keeper() -> RegionLeaseKeeper {
151        let store = Arc::new(MemoryKvBackend::new());
152
153        let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
154
155        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
156        RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper)
157    }
158
159    fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
160        RegionStat {
161            id: region_id,
162            role,
163            rcus: 0,
164            wcus: 0,
165            approximate_bytes: 0,
166            engine: String::new(),
167            num_rows: 0,
168            memtable_size: 0,
169            manifest_size: 0,
170            sst_size: 0,
171            sst_num: 0,
172            index_size: 0,
173            region_manifest: RegionManifestInfo::Mito {
174                manifest_version: 0,
175                flushed_entry_id: 0,
176            },
177            data_topic_latest_entry_id: 0,
178            metadata_topic_latest_entry_id: 0,
179            written_bytes: 0,
180        }
181    }
182
183    #[tokio::test]
184    async fn test_handle_upgradable_follower() {
185        let datanode_id = 1;
186        let region_number = 1u32;
187        let table_id = 10;
188        let region_id = RegionId::new(table_id, region_number);
189        let another_region_id = RegionId::new(table_id, region_number + 1);
190        let peer = Peer::empty(datanode_id);
191        let follower_peer = Peer::empty(datanode_id + 1);
192        let table_info = new_test_table_info(table_id, vec![region_number]).into();
193
194        let region_routes = vec![RegionRoute {
195            region: Region::new_test(region_id),
196            leader_peer: Some(peer.clone()),
197            follower_peers: vec![follower_peer.clone()],
198            ..Default::default()
199        }];
200
201        let keeper = new_test_keeper();
202        let table_metadata_manager = keeper.table_metadata_manager();
203
204        table_metadata_manager
205            .create_table_metadata(
206                table_info,
207                TableRouteValue::physical(region_routes),
208                HashMap::default(),
209            )
210            .await
211            .unwrap();
212
213        let builder = MetasrvBuilder::new();
214        let metasrv = builder.build().await.unwrap();
215        let ctx = &mut metasrv.new_ctx();
216
217        let acc = &mut HeartbeatAccumulator::default();
218
219        acc.stat = Some(Stat {
220            id: peer.id,
221            region_stats: vec![
222                new_empty_region_stat(region_id, RegionRole::Follower),
223                new_empty_region_stat(another_region_id, RegionRole::Follower),
224            ],
225            ..Default::default()
226        });
227
228        let req = HeartbeatRequest {
229            duration_since_epoch: 1234,
230            ..Default::default()
231        };
232
233        let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
234
235        let handler = RegionLeaseHandler::new(
236            distributed_time_constants::REGION_LEASE_SECS,
237            table_metadata_manager.clone(),
238            opening_region_keeper.clone(),
239            None,
240        );
241
242        handler.handle(&req, ctx, acc).await.unwrap();
243
244        assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
245        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
246        assert_eq!(
247            acc.region_lease.as_ref().unwrap().closeable_region_ids,
248            vec![another_region_id]
249        );
250
251        let acc = &mut HeartbeatAccumulator::default();
252
253        acc.stat = Some(Stat {
254            id: follower_peer.id,
255            region_stats: vec![
256                new_empty_region_stat(region_id, RegionRole::Follower),
257                new_empty_region_stat(another_region_id, RegionRole::Follower),
258            ],
259            ..Default::default()
260        });
261
262        handler.handle(&req, ctx, acc).await.unwrap();
263
264        assert_eq!(
265            acc.region_lease.as_ref().unwrap().lease_seconds,
266            distributed_time_constants::REGION_LEASE_SECS
267        );
268
269        assert_region_lease(
270            acc,
271            vec![GrantedRegion::new(region_id, RegionRole::Follower)],
272        );
273        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
274        assert_eq!(
275            acc.region_lease.as_ref().unwrap().closeable_region_ids,
276            vec![another_region_id]
277        );
278
279        let opening_region_id = RegionId::new(table_id, region_number + 2);
280        let _guard = opening_region_keeper
281            .register(follower_peer.id, opening_region_id)
282            .unwrap();
283
284        let acc = &mut HeartbeatAccumulator::default();
285
286        acc.stat = Some(Stat {
287            id: follower_peer.id,
288            region_stats: vec![
289                new_empty_region_stat(region_id, RegionRole::Follower),
290                new_empty_region_stat(another_region_id, RegionRole::Follower),
291                new_empty_region_stat(opening_region_id, RegionRole::Follower),
292            ],
293            ..Default::default()
294        });
295
296        handler.handle(&req, ctx, acc).await.unwrap();
297
298        assert_eq!(
299            acc.region_lease.as_ref().unwrap().lease_seconds,
300            distributed_time_constants::REGION_LEASE_SECS
301        );
302
303        assert_region_lease(
304            acc,
305            vec![
306                GrantedRegion::new(region_id, RegionRole::Follower),
307                GrantedRegion::new(opening_region_id, RegionRole::Follower),
308            ],
309        );
310        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
311        assert_eq!(
312            acc.region_lease.as_ref().unwrap().closeable_region_ids,
313            vec![another_region_id]
314        );
315    }
316
317    #[tokio::test]
318
319    async fn test_handle_downgradable_leader() {
320        let datanode_id = 1;
321        let region_number = 1u32;
322        let table_id = 10;
323        let region_id = RegionId::new(table_id, region_number);
324        let another_region_id = RegionId::new(table_id, region_number + 1);
325        let no_exist_region_id = RegionId::new(table_id, region_number + 2);
326        let peer = Peer::empty(datanode_id);
327        let follower_peer = Peer::empty(datanode_id + 1);
328        let table_info = new_test_table_info(table_id, vec![region_number]).into();
329
330        let region_routes = vec![
331            RegionRoute {
332                region: Region::new_test(region_id),
333                leader_peer: Some(peer.clone()),
334                follower_peers: vec![follower_peer.clone()],
335                leader_state: Some(LeaderState::Downgrading),
336                leader_down_since: Some(1),
337            },
338            RegionRoute {
339                region: Region::new_test(another_region_id),
340                leader_peer: Some(peer.clone()),
341                ..Default::default()
342            },
343        ];
344
345        let keeper = new_test_keeper();
346        let table_metadata_manager = keeper.table_metadata_manager();
347
348        table_metadata_manager
349            .create_table_metadata(
350                table_info,
351                TableRouteValue::physical(region_routes),
352                HashMap::default(),
353            )
354            .await
355            .unwrap();
356
357        let builder = MetasrvBuilder::new();
358        let metasrv = builder.build().await.unwrap();
359        let ctx = &mut metasrv.new_ctx();
360
361        let req = HeartbeatRequest {
362            duration_since_epoch: 1234,
363            ..Default::default()
364        };
365
366        let acc = &mut HeartbeatAccumulator::default();
367
368        acc.stat = Some(Stat {
369            id: peer.id,
370            region_stats: vec![
371                new_empty_region_stat(region_id, RegionRole::Leader),
372                new_empty_region_stat(another_region_id, RegionRole::Leader),
373                new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
374            ],
375            ..Default::default()
376        });
377
378        let handler = RegionLeaseHandler::new(
379            distributed_time_constants::REGION_LEASE_SECS,
380            table_metadata_manager.clone(),
381            Default::default(),
382            None,
383        );
384
385        handler.handle(&req, ctx, acc).await.unwrap();
386
387        assert_region_lease(
388            acc,
389            vec![
390                GrantedRegion::new(region_id, RegionRole::DowngradingLeader),
391                GrantedRegion::new(another_region_id, RegionRole::Leader),
392            ],
393        );
394        assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
395    }
396
397    fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
398        let region_lease = acc.region_lease.as_ref().unwrap().clone();
399        let granted: Vec<GrantedRegion> = region_lease
400            .regions
401            .into_iter()
402            .map(Into::into)
403            .collect::<Vec<_>>();
404
405        let granted = granted
406            .into_iter()
407            .map(|region| (region.region_id, region))
408            .collect::<HashMap<_, _>>();
409
410        let expected = expected
411            .into_iter()
412            .map(|region| (region.region_id, region))
413            .collect::<HashMap<_, _>>();
414
415        assert_eq!(granted, expected);
416    }
417}