meta_srv/handler/
region_lease_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
19use async_trait::async_trait;
20use common_meta::key::TableMetadataManagerRef;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use common_telemetry::error;
23use store_api::region_engine::GrantedRegion;
24use store_api::storage::RegionId;
25
26use crate::error::Result;
27use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
28use crate::metasrv::Context;
29use crate::region::RegionLeaseKeeper;
30use crate::region::lease_keeper::{
31    RegionLeaseInfo, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
32};
33
34pub struct RegionLeaseHandler {
35    region_lease_seconds: u64,
36    region_lease_keeper: RegionLeaseKeeperRef,
37    customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
38}
39
40pub type CustomizedRegionLeaseRenewerRef = Arc<dyn CustomizedRegionLeaseRenewer>;
41
42pub trait CustomizedRegionLeaseRenewer: Send + Sync {
43    fn renew(
44        &self,
45        ctx: &mut Context,
46        regions: HashMap<RegionId, RegionLeaseInfo>,
47    ) -> Vec<GrantedRegion>;
48}
49
50impl RegionLeaseHandler {
51    pub fn new(
52        region_lease_seconds: u64,
53        table_metadata_manager: TableMetadataManagerRef,
54        memory_region_keeper: MemoryRegionKeeperRef,
55        customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
56    ) -> Self {
57        let region_lease_keeper =
58            RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());
59
60        Self {
61            region_lease_seconds,
62            region_lease_keeper: Arc::new(region_lease_keeper),
63            customized_region_lease_renewer,
64        }
65    }
66}
67
68#[async_trait]
69impl HeartbeatHandler for RegionLeaseHandler {
70    fn is_acceptable(&self, role: Role) -> bool {
71        role == Role::Datanode
72    }
73
74    async fn handle(
75        &self,
76        req: &HeartbeatRequest,
77        ctx: &mut Context,
78        acc: &mut HeartbeatAccumulator,
79    ) -> Result<HandleControl> {
80        let Some(stat) = acc.stat.as_ref() else {
81            return Ok(HandleControl::Continue);
82        };
83
84        let regions = stat.regions();
85        let datanode_id = stat.id;
86
87        match self
88            .region_lease_keeper
89            .renew_region_leases(datanode_id, &regions)
90            .await
91        {
92            Ok(RenewRegionLeasesResponse {
93                non_exists,
94                renewed,
95            }) => {
96                let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
97                    renewer
98                        .renew(ctx, renewed)
99                        .into_iter()
100                        .map(|region| region.into())
101                        .collect()
102                } else {
103                    renewed
104                        .into_iter()
105                        .map(|(region_id, region_lease_info)| {
106                            GrantedRegion::new(region_id, region_lease_info.role).into()
107                        })
108                        .collect::<Vec<_>>()
109                };
110
111                acc.region_lease = Some(RegionLease {
112                    regions: renewed,
113                    duration_since_epoch: req.duration_since_epoch,
114                    lease_seconds: self.region_lease_seconds,
115                    closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
116                });
117                acc.inactive_region_ids = non_exists;
118            }
119            Err(e) => {
120                error!(e; "Failed to renew region leases for datanode: {datanode_id:?}, regions: {:?}", regions);
121                // If we throw error here, the datanode will be marked as failure by region failure handler.
122                // So we only log the error and continue.
123            }
124        }
125
126        Ok(HandleControl::Continue)
127    }
128}
129
130#[cfg(test)]
131mod test {
132
133    use std::collections::{HashMap, HashSet};
134    use std::sync::Arc;
135
136    use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
137    use common_meta::distributed_time_constants::default_distributed_time_constants;
138    use common_meta::key::TableMetadataManager;
139    use common_meta::key::table_route::TableRouteValue;
140    use common_meta::key::test_utils::new_test_table_info;
141    use common_meta::kv_backend::memory::MemoryKvBackend;
142    use common_meta::kv_backend::test_util::MockKvBackendBuilder;
143    use common_meta::peer::Peer;
144    use common_meta::region_keeper::MemoryRegionKeeper;
145    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
146    use store_api::region_engine::RegionRole;
147    use store_api::storage::RegionId;
148
149    use super::*;
150    use crate::metasrv::builder::MetasrvBuilder;
151
152    fn new_test_keeper() -> RegionLeaseKeeper {
153        let store = Arc::new(MemoryKvBackend::new());
154
155        let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
156
157        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
158        RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper)
159    }
160
161    fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
162        RegionStat {
163            id: region_id,
164            role,
165            rcus: 0,
166            wcus: 0,
167            approximate_bytes: 0,
168            engine: String::new(),
169            num_rows: 0,
170            memtable_size: 0,
171            manifest_size: 0,
172            sst_size: 0,
173            sst_num: 0,
174            index_size: 0,
175            region_manifest: RegionManifestInfo::Mito {
176                manifest_version: 0,
177                flushed_entry_id: 0,
178                file_removed_cnt: 0,
179            },
180            data_topic_latest_entry_id: 0,
181            metadata_topic_latest_entry_id: 0,
182            written_bytes: 0,
183        }
184    }
185
186    #[tokio::test]
187    async fn test_handle_upgradable_follower() {
188        let datanode_id = 1;
189        let region_number = 1u32;
190        let table_id = 10;
191        let region_id = RegionId::new(table_id, region_number);
192        let another_region_id = RegionId::new(table_id, region_number + 1);
193        let peer = Peer::empty(datanode_id);
194        let follower_peer = Peer::empty(datanode_id + 1);
195        let table_info = new_test_table_info(table_id);
196
197        let region_routes = vec![RegionRoute {
198            region: Region::new_test(region_id),
199            leader_peer: Some(peer.clone()),
200            follower_peers: vec![follower_peer.clone()],
201            ..Default::default()
202        }];
203
204        let keeper = new_test_keeper();
205        let table_metadata_manager = keeper.table_metadata_manager();
206
207        table_metadata_manager
208            .create_table_metadata(
209                table_info,
210                TableRouteValue::physical(region_routes),
211                HashMap::default(),
212            )
213            .await
214            .unwrap();
215
216        let builder = MetasrvBuilder::new();
217        let metasrv = builder.build().await.unwrap();
218        let ctx = &mut metasrv.new_ctx();
219
220        let acc = &mut HeartbeatAccumulator::default();
221
222        acc.stat = Some(Stat {
223            id: peer.id,
224            region_stats: vec![
225                new_empty_region_stat(region_id, RegionRole::Follower),
226                new_empty_region_stat(another_region_id, RegionRole::Follower),
227            ],
228            ..Default::default()
229        });
230
231        let req = HeartbeatRequest {
232            duration_since_epoch: 1234,
233            ..Default::default()
234        };
235
236        let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
237
238        let handler = RegionLeaseHandler::new(
239            default_distributed_time_constants().region_lease.as_secs(),
240            table_metadata_manager.clone(),
241            opening_region_keeper.clone(),
242            None,
243        );
244
245        handler.handle(&req, ctx, acc).await.unwrap();
246
247        assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
248        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
249        assert_eq!(
250            acc.region_lease.as_ref().unwrap().closeable_region_ids,
251            vec![another_region_id]
252        );
253
254        let acc = &mut HeartbeatAccumulator::default();
255
256        acc.stat = Some(Stat {
257            id: follower_peer.id,
258            region_stats: vec![
259                new_empty_region_stat(region_id, RegionRole::Follower),
260                new_empty_region_stat(another_region_id, RegionRole::Follower),
261            ],
262            ..Default::default()
263        });
264
265        handler.handle(&req, ctx, acc).await.unwrap();
266
267        assert_eq!(
268            acc.region_lease.as_ref().unwrap().lease_seconds,
269            default_distributed_time_constants().region_lease.as_secs()
270        );
271
272        assert_region_lease(
273            acc,
274            vec![GrantedRegion::new(region_id, RegionRole::Follower)],
275        );
276        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
277        assert_eq!(
278            acc.region_lease.as_ref().unwrap().closeable_region_ids,
279            vec![another_region_id]
280        );
281
282        let opening_region_id = RegionId::new(table_id, region_number + 2);
283        let _guard = opening_region_keeper
284            .register(follower_peer.id, opening_region_id)
285            .unwrap();
286
287        let acc = &mut HeartbeatAccumulator::default();
288
289        acc.stat = Some(Stat {
290            id: follower_peer.id,
291            region_stats: vec![
292                new_empty_region_stat(region_id, RegionRole::Follower),
293                new_empty_region_stat(another_region_id, RegionRole::Follower),
294                new_empty_region_stat(opening_region_id, RegionRole::Follower),
295            ],
296            ..Default::default()
297        });
298
299        handler.handle(&req, ctx, acc).await.unwrap();
300
301        assert_eq!(
302            acc.region_lease.as_ref().unwrap().lease_seconds,
303            default_distributed_time_constants().region_lease.as_secs()
304        );
305
306        assert_region_lease(
307            acc,
308            vec![
309                GrantedRegion::new(region_id, RegionRole::Follower),
310                GrantedRegion::new(opening_region_id, RegionRole::Follower),
311            ],
312        );
313        assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
314        assert_eq!(
315            acc.region_lease.as_ref().unwrap().closeable_region_ids,
316            vec![another_region_id]
317        );
318    }
319
320    #[tokio::test]
321
322    async fn test_handle_downgradable_leader() {
323        let datanode_id = 1;
324        let region_number = 1u32;
325        let table_id = 10;
326        let region_id = RegionId::new(table_id, region_number);
327        let another_region_id = RegionId::new(table_id, region_number + 1);
328        let no_exist_region_id = RegionId::new(table_id, region_number + 2);
329        let peer = Peer::empty(datanode_id);
330        let follower_peer = Peer::empty(datanode_id + 1);
331        let table_info = new_test_table_info(table_id);
332
333        let region_routes = vec![
334            RegionRoute {
335                region: Region::new_test(region_id),
336                leader_peer: Some(peer.clone()),
337                follower_peers: vec![follower_peer.clone()],
338                leader_state: Some(LeaderState::Downgrading),
339                leader_down_since: Some(1),
340                write_route_policy: None,
341            },
342            RegionRoute {
343                region: Region::new_test(another_region_id),
344                leader_peer: Some(peer.clone()),
345                ..Default::default()
346            },
347        ];
348
349        let keeper = new_test_keeper();
350        let table_metadata_manager = keeper.table_metadata_manager();
351
352        table_metadata_manager
353            .create_table_metadata(
354                table_info,
355                TableRouteValue::physical(region_routes),
356                HashMap::default(),
357            )
358            .await
359            .unwrap();
360
361        let builder = MetasrvBuilder::new();
362        let metasrv = builder.build().await.unwrap();
363        let ctx = &mut metasrv.new_ctx();
364
365        let req = HeartbeatRequest {
366            duration_since_epoch: 1234,
367            ..Default::default()
368        };
369
370        let acc = &mut HeartbeatAccumulator::default();
371
372        acc.stat = Some(Stat {
373            id: peer.id,
374            region_stats: vec![
375                new_empty_region_stat(region_id, RegionRole::Leader),
376                new_empty_region_stat(another_region_id, RegionRole::Leader),
377                new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
378            ],
379            ..Default::default()
380        });
381
382        let handler = RegionLeaseHandler::new(
383            default_distributed_time_constants().region_lease.as_secs(),
384            table_metadata_manager.clone(),
385            Default::default(),
386            None,
387        );
388
389        handler.handle(&req, ctx, acc).await.unwrap();
390
391        assert_region_lease(
392            acc,
393            vec![
394                GrantedRegion::new(region_id, RegionRole::DowngradingLeader),
395                GrantedRegion::new(another_region_id, RegionRole::Leader),
396            ],
397        );
398        assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
399    }
400
401    fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
402        let region_lease = acc.region_lease.as_ref().unwrap().clone();
403        let granted: Vec<GrantedRegion> = region_lease
404            .regions
405            .into_iter()
406            .map(Into::into)
407            .collect::<Vec<_>>();
408
409        let granted = granted
410            .into_iter()
411            .map(|region| (region.region_id, region))
412            .collect::<HashMap<_, _>>();
413
414        let expected = expected
415            .into_iter()
416            .map(|region| (region.region_id, region))
417            .collect::<HashMap<_, _>>();
418
419        assert_eq!(granted, expected);
420    }
421
422    #[tokio::test]
423    async fn test_handle_renew_region_lease_failure() {
424        common_telemetry::init_default_ut_logging();
425        let kv = MockKvBackendBuilder::default()
426            .batch_get_fn(Arc::new(|_| {
427                common_meta::error::UnexpectedSnafu {
428                    err_msg: "mock err",
429                }
430                .fail()
431            }) as _)
432            .build()
433            .unwrap();
434        let kvbackend = Arc::new(kv);
435        let table_metadata_manager = Arc::new(TableMetadataManager::new(kvbackend));
436
437        let datanode_id = 1;
438        let region_number = 1u32;
439        let table_id = 10;
440        let region_id = RegionId::new(table_id, region_number);
441        let another_region_id = RegionId::new(table_id, region_number + 1);
442        let no_exist_region_id = RegionId::new(table_id, region_number + 2);
443        let peer = Peer::empty(datanode_id);
444
445        let builder = MetasrvBuilder::new();
446        let metasrv = builder.build().await.unwrap();
447        let ctx = &mut metasrv.new_ctx();
448
449        let req = HeartbeatRequest {
450            duration_since_epoch: 1234,
451            ..Default::default()
452        };
453
454        let acc = &mut HeartbeatAccumulator::default();
455        acc.stat = Some(Stat {
456            id: peer.id,
457            region_stats: vec![
458                new_empty_region_stat(region_id, RegionRole::Leader),
459                new_empty_region_stat(another_region_id, RegionRole::Leader),
460                new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
461            ],
462            ..Default::default()
463        });
464        let handler = RegionLeaseHandler::new(
465            default_distributed_time_constants().region_lease.as_secs(),
466            table_metadata_manager.clone(),
467            Default::default(),
468            None,
469        );
470        handler.handle(&req, ctx, acc).await.unwrap();
471
472        assert!(acc.region_lease.is_none());
473        assert!(acc.inactive_region_ids.is_empty());
474    }
475}