meta_srv/handler/
collect_leader_region_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{HeartbeatRequest, Role};
16use common_meta::region_registry::{LeaderRegion, LeaderRegionManifestInfo};
17use store_api::region_engine::RegionRole;
18
19use crate::error::Result;
20use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
21use crate::metasrv::Context;
22
23pub struct CollectLeaderRegionHandler;
24
25#[async_trait::async_trait]
26impl HeartbeatHandler for CollectLeaderRegionHandler {
27    fn is_acceptable(&self, role: Role) -> bool {
28        role == Role::Datanode
29    }
30
31    async fn handle(
32        &self,
33        _req: &HeartbeatRequest,
34        ctx: &mut Context,
35        acc: &mut HeartbeatAccumulator,
36    ) -> Result<HandleControl> {
37        let Some(current_stat) = acc.stat.as_ref() else {
38            return Ok(HandleControl::Continue);
39        };
40
41        let mut key_values = Vec::with_capacity(current_stat.region_stats.len());
42        for stat in current_stat.region_stats.iter() {
43            if stat.role != RegionRole::Leader {
44                continue;
45            }
46
47            let manifest = LeaderRegionManifestInfo::from_region_stat(stat);
48            let value = LeaderRegion {
49                datanode_id: current_stat.id,
50                manifest,
51            };
52            key_values.push((stat.id, value));
53        }
54        ctx.leader_region_registry.batch_put(key_values);
55
56        Ok(HandleControl::Continue)
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
63    use common_meta::region_registry::LeaderRegionManifestInfo;
64    use store_api::region_engine::RegionRole;
65    use store_api::storage::RegionId;
66
67    use super::*;
68    use crate::handler::test_utils::TestEnv;
69
70    fn new_region_stat(id: RegionId, manifest_version: u64, role: RegionRole) -> RegionStat {
71        RegionStat {
72            id,
73            region_manifest: RegionManifestInfo::Mito {
74                manifest_version,
75                flushed_entry_id: 0,
76            },
77            rcus: 0,
78            wcus: 0,
79            approximate_bytes: 0,
80            engine: "mito".to_string(),
81            role,
82            num_rows: 0,
83            memtable_size: 0,
84            manifest_size: 0,
85            sst_size: 0,
86            sst_num: 0,
87            index_size: 0,
88            data_topic_latest_entry_id: 0,
89            metadata_topic_latest_entry_id: 0,
90            written_bytes: 0,
91        }
92    }
93
94    #[tokio::test]
95    async fn test_handle_collect_leader_region() {
96        let env = TestEnv::new();
97        let mut ctx = env.ctx();
98
99        let mut acc = HeartbeatAccumulator {
100            stat: Some(Stat {
101                id: 1,
102                region_stats: vec![
103                    new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader),
104                    new_region_stat(RegionId::new(1, 2), 2, RegionRole::Follower),
105                ],
106                addr: "127.0.0.1:0000".to_string(),
107                region_num: 2,
108                ..Default::default()
109            }),
110            ..Default::default()
111        };
112
113        let handler = CollectLeaderRegionHandler;
114        let control = handler
115            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
116            .await
117            .unwrap();
118
119        assert_eq!(control, HandleControl::Continue);
120        let regions = ctx
121            .leader_region_registry
122            .batch_get(vec![RegionId::new(1, 1), RegionId::new(1, 2)].into_iter());
123        assert_eq!(regions.len(), 1);
124        assert_eq!(
125            regions.get(&RegionId::new(1, 1)),
126            Some(&LeaderRegion {
127                datanode_id: 1,
128                manifest: LeaderRegionManifestInfo::Mito {
129                    manifest_version: 1,
130                    flushed_entry_id: 0,
131                    topic_latest_entry_id: 0,
132                },
133            })
134        );
135
136        // New heartbeat with new manifest version
137        acc.stat = Some(Stat {
138            id: 1,
139            region_stats: vec![new_region_stat(RegionId::new(1, 1), 2, RegionRole::Leader)],
140            timestamp_millis: 0,
141            addr: "127.0.0.1:0000".to_string(),
142            region_num: 1,
143            node_epoch: 0,
144            ..Default::default()
145        });
146        let control = handler
147            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
148            .await
149            .unwrap();
150
151        assert_eq!(control, HandleControl::Continue);
152        let regions = ctx
153            .leader_region_registry
154            .batch_get(vec![RegionId::new(1, 1)].into_iter());
155        assert_eq!(regions.len(), 1);
156        assert_eq!(
157            regions.get(&RegionId::new(1, 1)),
158            Some(&LeaderRegion {
159                datanode_id: 1,
160                manifest: LeaderRegionManifestInfo::Mito {
161                    manifest_version: 2,
162                    flushed_entry_id: 0,
163                    topic_latest_entry_id: 0,
164                },
165            })
166        );
167
168        // New heartbeat with old manifest version
169        acc.stat = Some(Stat {
170            id: 1,
171            region_stats: vec![new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader)],
172            timestamp_millis: 0,
173            addr: "127.0.0.1:0000".to_string(),
174            region_num: 1,
175            node_epoch: 0,
176            ..Default::default()
177        });
178        let control = handler
179            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
180            .await
181            .unwrap();
182
183        assert_eq!(control, HandleControl::Continue);
184        let regions = ctx
185            .leader_region_registry
186            .batch_get(vec![RegionId::new(1, 1)].into_iter());
187        assert_eq!(regions.len(), 1);
188        assert_eq!(
189            regions.get(&RegionId::new(1, 1)),
190            // The manifest version is not updated
191            Some(&LeaderRegion {
192                datanode_id: 1,
193                manifest: LeaderRegionManifestInfo::Mito {
194                    manifest_version: 2,
195                    flushed_entry_id: 0,
196                    topic_latest_entry_id: 0,
197                },
198            })
199        );
200    }
201}