meta_srv/handler/
region_lease_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
19use async_trait::async_trait;
20use common_meta::key::TableMetadataManagerRef;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use common_telemetry::error;
23use store_api::region_engine::GrantedRegion;
24use store_api::storage::RegionId;
25
26use crate::error::Result;
27use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
28use crate::metasrv::Context;
29use crate::region::RegionLeaseKeeper;
30use crate::region::lease_keeper::{
31    RegionLeaseInfo, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
32};
33
34pub struct RegionLeaseHandler {
35    region_lease_seconds: u64,
36    region_lease_keeper: RegionLeaseKeeperRef,
37    customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
38}
39
40pub type CustomizedRegionLeaseRenewerRef = Arc<dyn CustomizedRegionLeaseRenewer>;
41
42pub trait CustomizedRegionLeaseRenewer: Send + Sync {
43    fn renew(
44        &self,
45        ctx: &mut Context,
46        regions: HashMap<RegionId, RegionLeaseInfo>,
47    ) -> Vec<GrantedRegion>;
48}
49
50impl RegionLeaseHandler {
51    pub fn new(
52        region_lease_seconds: u64,
53        table_metadata_manager: TableMetadataManagerRef,
54        memory_region_keeper: MemoryRegionKeeperRef,
55        customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
56    ) -> Self {
57        let region_lease_keeper =
58            RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());
59
60        Self {
61            region_lease_seconds,
62            region_lease_keeper: Arc::new(region_lease_keeper),
63            customized_region_lease_renewer,
64        }
65    }
66}
67
68#[async_trait]
69impl HeartbeatHandler for RegionLeaseHandler {
70    fn is_acceptable(&self, role: Role) -> bool {
71        role == Role::Datanode
72    }
73
74    async fn handle(
75        &self,
76        req: &HeartbeatRequest,
77        ctx: &mut Context,
78        acc: &mut HeartbeatAccumulator,
79    ) -> Result<HandleControl> {
80        let Some(stat) = acc.stat.as_ref() else {
81            return Ok(HandleControl::Continue);
82        };
83
84        let regions = stat.regions();
85        let datanode_id = stat.id;
86
87        match self
88            .region_lease_keeper
89            .renew_region_leases(datanode_id, &regions)
90            .await
91        {
92            Ok(RenewRegionLeasesResponse {
93                non_exists,
94                renewed,
95            }) => {
96                let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
97                    renewer
98                        .renew(ctx, renewed)
99                        .into_iter()
100                        .map(|region| region.into())
101                        .collect()
102                } else {
103                    renewed
104                        .into_iter()
105                        .map(|(region_id, region_lease_info)| {
106                            GrantedRegion::new(region_id, region_lease_info.role).into()
107                        })
108                        .collect::<Vec<_>>()
109                };
110
111                acc.region_lease = Some(RegionLease {
112                    regions: renewed,
113                    duration_since_epoch: req.duration_since_epoch,
114                    lease_seconds: self.region_lease_seconds,
115                    closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
116                });
117                acc.inactive_region_ids = non_exists;
118            }
119            Err(e) => {
120                error!(e; "Failed to renew region leases for datanode: {datanode_id:?}, regions: {:?}", regions);
121                // If we throw error here, the datanode will be marked as failure by region failure handler.
122                // So we only log the error and continue.
123            }
124        }
125
126        Ok(HandleControl::Continue)
127    }
128}
129
130#[cfg(test)]
131mod test {
132
133    use std::collections::{HashMap, HashSet};
134    use std::sync::Arc;
135
136    use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
137    use common_meta::distributed_time_constants::default_distributed_time_constants;
138    use common_meta::key::TableMetadataManager;
139    use common_meta::key::table_route::TableRouteValue;
140    use common_meta::key::test_utils::new_test_table_info;
141    use common_meta::kv_backend::memory::MemoryKvBackend;
142    use common_meta::kv_backend::test_util::MockKvBackendBuilder;
143    use common_meta::peer::Peer;
144    use common_meta::region_keeper::MemoryRegionKeeper;
145    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
146    use store_api::region_engine::RegionRole;
147    use store_api::storage::RegionId;
148
149    use super::*;
150    use crate::metasrv::builder::MetasrvBuilder;
151
152    fn new_test_keeper() -> RegionLeaseKeeper {
153        let store = Arc::new(MemoryKvBackend::new());
154
155        let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
156
157        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
158        RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper)
159    }
160
161    fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
162        RegionStat {
163            id: region_id,
164            role,
165            rcus: 0,
166            wcus: 0,
167            approximate_bytes: 0,
168            engine: String::new(),
169            num_rows: 0,
170            memtable_size: 0,
171            manifest_size: 0,
172            sst_size: 0,
173            sst_num: 0,
174            index_size: 0,
175            region_manifest: RegionManifestInfo::Mito {
176                manifest_version: 0,
177                flushed_entry_id: 0,
178                file_removed_cnt: 0,
179            },
180            data_topic_latest_entry_id: 0,
181            metadata_topic_latest_entry_id: 0,
182            written_bytes: 0,
183        }
184    }
185
186    #[tokio::test]
187    async fn test_handle_upgradable_follower() {
188        let datanode_id = 1;
189        let region_number = 1u32;
190        let table_id = 10;
191        let region_id = RegionId::new(table_id, region_number);
192        let another_region_id = RegionId::new(table_id, region_number + 1);
193        let peer = Peer::empty(datanode_id);
194        let follower_peer = Peer::empty(datanode_id + 1);
195        let table_info = new_test_table_info(table_id, vec![region_number]).into();
196
197        let region_routes = vec![RegionRoute {
198            region: Region::new_test(region_id),
199            leader_peer: Some(peer.clone()),
200            follower_peers: vec![follower_peer.clone()],
201            ..Default::default()
202        }];
203
204        let keeper = new_test_keeper();
205        let table_metadata_manager = keeper.table_metadata_manager();
206
207        table_metadata_manager
208            .create_table_metadata(
209                table_info,
210                TableRouteValue::physical(region_routes),
211                HashMap::default(),
212            )
213            .await
214            .unwrap();
215
216        let builder = MetasrvBuilder::new();
217        let metasrv = builder.build().await.unwrap();
218        let ctx = &mut metasrv.new_ctx();
219
220        let acc = &mut HeartbeatAccumulator::default();
221
222        acc.stat = Some(Stat {
223            id: peer.id,
224            region_stats: vec![
225                new_empty_region_stat(region_id, RegionRole::Follower),
226                new_empty_region_stat(another_region_id, RegionRole::Follower),
227            ],
228            ..Default::default()
229        });
230
231        let req = HeartbeatRequest {
232            duration_since_epoch: 1234,
233            ..Default::default()
234        };
235
236        let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
237
238        let handler = RegionLeaseHandler::new(
239            default_distributed_time_constants().region_lease.as_secs(),
240            table_metadata_manager.clone(),
241            opening_region_keeper.clone(),
242            None,
243        );
244
245        handler.handle(&req, ctx, acc).await.unwrap();
246
247        assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
248        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
249        assert_eq!(
250            acc.region_lease.as_ref().unwrap().closeable_region_ids,
251            vec![another_region_id]
252        );
253
254        let acc = &mut HeartbeatAccumulator::default();
255
256        acc.stat = Some(Stat {
257            id: follower_peer.id,
258            region_stats: vec![
259                new_empty_region_stat(region_id, RegionRole::Follower),
260                new_empty_region_stat(another_region_id, RegionRole::Follower),
261            ],
262            ..Default::default()
263        });
264
265        handler.handle(&req, ctx, acc).await.unwrap();
266
267        assert_eq!(
268            acc.region_lease.as_ref().unwrap().lease_seconds,
269            default_distributed_time_constants().region_lease.as_secs()
270        );
271
272        assert_region_lease(
273            acc,
274            vec![GrantedRegion::new(region_id, RegionRole::Follower)],
275        );
276        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
277        assert_eq!(
278            acc.region_lease.as_ref().unwrap().closeable_region_ids,
279            vec![another_region_id]
280        );
281
282        let opening_region_id = RegionId::new(table_id, region_number + 2);
283        let _guard = opening_region_keeper
284            .register(follower_peer.id, opening_region_id)
285            .unwrap();
286
287        let acc = &mut HeartbeatAccumulator::default();
288
289        acc.stat = Some(Stat {
290            id: follower_peer.id,
291            region_stats: vec![
292                new_empty_region_stat(region_id, RegionRole::Follower),
293                new_empty_region_stat(another_region_id, RegionRole::Follower),
294                new_empty_region_stat(opening_region_id, RegionRole::Follower),
295            ],
296            ..Default::default()
297        });
298
299        handler.handle(&req, ctx, acc).await.unwrap();
300
301        assert_eq!(
302            acc.region_lease.as_ref().unwrap().lease_seconds,
303            default_distributed_time_constants().region_lease.as_secs()
304        );
305
306        assert_region_lease(
307            acc,
308            vec![
309                GrantedRegion::new(region_id, RegionRole::Follower),
310                GrantedRegion::new(opening_region_id, RegionRole::Follower),
311            ],
312        );
313        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
314        assert_eq!(
315            acc.region_lease.as_ref().unwrap().closeable_region_ids,
316            vec![another_region_id]
317        );
318    }
319
320    #[tokio::test]
321
322    async fn test_handle_downgradable_leader() {
323        let datanode_id = 1;
324        let region_number = 1u32;
325        let table_id = 10;
326        let region_id = RegionId::new(table_id, region_number);
327        let another_region_id = RegionId::new(table_id, region_number + 1);
328        let no_exist_region_id = RegionId::new(table_id, region_number + 2);
329        let peer = Peer::empty(datanode_id);
330        let follower_peer = Peer::empty(datanode_id + 1);
331        let table_info = new_test_table_info(table_id, vec![region_number]).into();
332
333        let region_routes = vec![
334            RegionRoute {
335                region: Region::new_test(region_id),
336                leader_peer: Some(peer.clone()),
337                follower_peers: vec![follower_peer.clone()],
338                leader_state: Some(LeaderState::Downgrading),
339                leader_down_since: Some(1),
340            },
341            RegionRoute {
342                region: Region::new_test(another_region_id),
343                leader_peer: Some(peer.clone()),
344                ..Default::default()
345            },
346        ];
347
348        let keeper = new_test_keeper();
349        let table_metadata_manager = keeper.table_metadata_manager();
350
351        table_metadata_manager
352            .create_table_metadata(
353                table_info,
354                TableRouteValue::physical(region_routes),
355                HashMap::default(),
356            )
357            .await
358            .unwrap();
359
360        let builder = MetasrvBuilder::new();
361        let metasrv = builder.build().await.unwrap();
362        let ctx = &mut metasrv.new_ctx();
363
364        let req = HeartbeatRequest {
365            duration_since_epoch: 1234,
366            ..Default::default()
367        };
368
369        let acc = &mut HeartbeatAccumulator::default();
370
371        acc.stat = Some(Stat {
372            id: peer.id,
373            region_stats: vec![
374                new_empty_region_stat(region_id, RegionRole::Leader),
375                new_empty_region_stat(another_region_id, RegionRole::Leader),
376                new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
377            ],
378            ..Default::default()
379        });
380
381        let handler = RegionLeaseHandler::new(
382            default_distributed_time_constants().region_lease.as_secs(),
383            table_metadata_manager.clone(),
384            Default::default(),
385            None,
386        );
387
388        handler.handle(&req, ctx, acc).await.unwrap();
389
390        assert_region_lease(
391            acc,
392            vec![
393                GrantedRegion::new(region_id, RegionRole::DowngradingLeader),
394                GrantedRegion::new(another_region_id, RegionRole::Leader),
395            ],
396        );
397        assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
398    }
399
400    fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
401        let region_lease = acc.region_lease.as_ref().unwrap().clone();
402        let granted: Vec<GrantedRegion> = region_lease
403            .regions
404            .into_iter()
405            .map(Into::into)
406            .collect::<Vec<_>>();
407
408        let granted = granted
409            .into_iter()
410            .map(|region| (region.region_id, region))
411            .collect::<HashMap<_, _>>();
412
413        let expected = expected
414            .into_iter()
415            .map(|region| (region.region_id, region))
416            .collect::<HashMap<_, _>>();
417
418        assert_eq!(granted, expected);
419    }
420
421    #[tokio::test]
422    async fn test_handle_renew_region_lease_failure() {
423        common_telemetry::init_default_ut_logging();
424        let kv = MockKvBackendBuilder::default()
425            .batch_get_fn(Arc::new(|_| {
426                common_meta::error::UnexpectedSnafu {
427                    err_msg: "mock err",
428                }
429                .fail()
430            }) as _)
431            .build()
432            .unwrap();
433        let kvbackend = Arc::new(kv);
434        let table_metadata_manager = Arc::new(TableMetadataManager::new(kvbackend));
435
436        let datanode_id = 1;
437        let region_number = 1u32;
438        let table_id = 10;
439        let region_id = RegionId::new(table_id, region_number);
440        let another_region_id = RegionId::new(table_id, region_number + 1);
441        let no_exist_region_id = RegionId::new(table_id, region_number + 2);
442        let peer = Peer::empty(datanode_id);
443
444        let builder = MetasrvBuilder::new();
445        let metasrv = builder.build().await.unwrap();
446        let ctx = &mut metasrv.new_ctx();
447
448        let req = HeartbeatRequest {
449            duration_since_epoch: 1234,
450            ..Default::default()
451        };
452
453        let acc = &mut HeartbeatAccumulator::default();
454        acc.stat = Some(Stat {
455            id: peer.id,
456            region_stats: vec![
457                new_empty_region_stat(region_id, RegionRole::Leader),
458                new_empty_region_stat(another_region_id, RegionRole::Leader),
459                new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
460            ],
461            ..Default::default()
462        });
463        let handler = RegionLeaseHandler::new(
464            default_distributed_time_constants().region_lease.as_secs(),
465            table_metadata_manager.clone(),
466            Default::default(),
467            None,
468        );
469        handler.handle(&req, ctx, acc).await.unwrap();
470
471        assert!(acc.region_lease.is_none());
472        assert!(acc.inactive_region_ids.is_empty());
473    }
474}