meta_srv/handler/
collect_leader_region_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{HeartbeatRequest, Role};
16use common_meta::region_registry::{LeaderRegion, LeaderRegionManifestInfo};
17use store_api::region_engine::RegionRole;
18
19use crate::error::Result;
20use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
21use crate::metasrv::Context;
22
23pub struct CollectLeaderRegionHandler;
24
25#[async_trait::async_trait]
26impl HeartbeatHandler for CollectLeaderRegionHandler {
27    fn is_acceptable(&self, role: Role) -> bool {
28        role == Role::Datanode
29    }
30
31    async fn handle(
32        &self,
33        _req: &HeartbeatRequest,
34        ctx: &mut Context,
35        acc: &mut HeartbeatAccumulator,
36    ) -> Result<HandleControl> {
37        let Some(current_stat) = acc.stat.as_ref() else {
38            return Ok(HandleControl::Continue);
39        };
40
41        let mut key_values = Vec::with_capacity(current_stat.region_stats.len());
42        for stat in current_stat.region_stats.iter() {
43            if stat.role != RegionRole::Leader {
44                continue;
45            }
46
47            let manifest = LeaderRegionManifestInfo::from_region_stat(stat);
48            let value = LeaderRegion {
49                datanode_id: current_stat.id,
50                manifest,
51            };
52            key_values.push((stat.id, value));
53        }
54        ctx.leader_region_registry.batch_put(key_values);
55
56        Ok(HandleControl::Continue)
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
63    use common_meta::region_registry::LeaderRegionManifestInfo;
64    use store_api::region_engine::RegionRole;
65    use store_api::storage::RegionId;
66
67    use super::*;
68    use crate::handler::test_utils::TestEnv;
69
70    fn new_region_stat(id: RegionId, manifest_version: u64, role: RegionRole) -> RegionStat {
71        RegionStat {
72            id,
73            region_manifest: RegionManifestInfo::Mito {
74                manifest_version,
75                flushed_entry_id: 0,
76                file_removed_cnt: 0,
77            },
78            rcus: 0,
79            wcus: 0,
80            approximate_bytes: 0,
81            engine: "mito".to_string(),
82            role,
83            num_rows: 0,
84            memtable_size: 0,
85            manifest_size: 0,
86            sst_size: 0,
87            sst_num: 0,
88            index_size: 0,
89            data_topic_latest_entry_id: 0,
90            metadata_topic_latest_entry_id: 0,
91            written_bytes: 0,
92        }
93    }
94
95    #[tokio::test]
96    async fn test_handle_collect_leader_region() {
97        let env = TestEnv::new();
98        let mut ctx = env.ctx();
99
100        let mut acc = HeartbeatAccumulator {
101            stat: Some(Stat {
102                id: 1,
103                region_stats: vec![
104                    new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader),
105                    new_region_stat(RegionId::new(1, 2), 2, RegionRole::Follower),
106                ],
107                addr: "127.0.0.1:0000".to_string(),
108                region_num: 2,
109                ..Default::default()
110            }),
111            ..Default::default()
112        };
113
114        let handler = CollectLeaderRegionHandler;
115        let control = handler
116            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
117            .await
118            .unwrap();
119
120        assert_eq!(control, HandleControl::Continue);
121        let regions = ctx
122            .leader_region_registry
123            .batch_get(vec![RegionId::new(1, 1), RegionId::new(1, 2)].into_iter());
124        assert_eq!(regions.len(), 1);
125        assert_eq!(
126            regions.get(&RegionId::new(1, 1)),
127            Some(&LeaderRegion {
128                datanode_id: 1,
129                manifest: LeaderRegionManifestInfo::Mito {
130                    manifest_version: 1,
131                    flushed_entry_id: 0,
132                    topic_latest_entry_id: 0,
133                },
134            })
135        );
136
137        // New heartbeat with new manifest version
138        acc.stat = Some(Stat {
139            id: 1,
140            region_stats: vec![new_region_stat(RegionId::new(1, 1), 2, RegionRole::Leader)],
141            timestamp_millis: 0,
142            addr: "127.0.0.1:0000".to_string(),
143            region_num: 1,
144            node_epoch: 0,
145            ..Default::default()
146        });
147        let control = handler
148            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
149            .await
150            .unwrap();
151
152        assert_eq!(control, HandleControl::Continue);
153        let regions = ctx
154            .leader_region_registry
155            .batch_get(vec![RegionId::new(1, 1)].into_iter());
156        assert_eq!(regions.len(), 1);
157        assert_eq!(
158            regions.get(&RegionId::new(1, 1)),
159            Some(&LeaderRegion {
160                datanode_id: 1,
161                manifest: LeaderRegionManifestInfo::Mito {
162                    manifest_version: 2,
163                    flushed_entry_id: 0,
164                    topic_latest_entry_id: 0,
165                },
166            })
167        );
168
169        // New heartbeat with old manifest version
170        acc.stat = Some(Stat {
171            id: 1,
172            region_stats: vec![new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader)],
173            timestamp_millis: 0,
174            addr: "127.0.0.1:0000".to_string(),
175            region_num: 1,
176            node_epoch: 0,
177            ..Default::default()
178        });
179        let control = handler
180            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
181            .await
182            .unwrap();
183
184        assert_eq!(control, HandleControl::Continue);
185        let regions = ctx
186            .leader_region_registry
187            .batch_get(vec![RegionId::new(1, 1)].into_iter());
188        assert_eq!(regions.len(), 1);
189        assert_eq!(
190            regions.get(&RegionId::new(1, 1)),
191            // The manifest version is not updated
192            Some(&LeaderRegion {
193                datanode_id: 1,
194                manifest: LeaderRegionManifestInfo::Mito {
195                    manifest_version: 2,
196                    flushed_entry_id: 0,
197                    topic_latest_entry_id: 0,
198                },
199            })
200        );
201    }
202}