1use api::v1::meta::{HeartbeatRequest, Role};
16use common_meta::region_registry::{LeaderRegion, LeaderRegionManifestInfo};
17use store_api::region_engine::RegionRole;
18
19use crate::error::Result;
20use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
21use crate::metasrv::Context;
22
23pub struct CollectLeaderRegionHandler;
24
25#[async_trait::async_trait]
26impl HeartbeatHandler for CollectLeaderRegionHandler {
27 fn is_acceptable(&self, role: Role) -> bool {
28 role == Role::Datanode
29 }
30
31 async fn handle(
32 &self,
33 _req: &HeartbeatRequest,
34 ctx: &mut Context,
35 acc: &mut HeartbeatAccumulator,
36 ) -> Result<HandleControl> {
37 let Some(current_stat) = acc.stat.as_ref() else {
38 return Ok(HandleControl::Continue);
39 };
40
41 let mut key_values = Vec::with_capacity(current_stat.region_stats.len());
42 for stat in current_stat.region_stats.iter() {
43 if stat.role != RegionRole::Leader {
44 continue;
45 }
46
47 let manifest = LeaderRegionManifestInfo::from_region_stat(stat);
48 let value = LeaderRegion {
49 datanode_id: current_stat.id,
50 manifest,
51 };
52 key_values.push((stat.id, value));
53 }
54 ctx.leader_region_registry.batch_put(key_values);
55
56 Ok(HandleControl::Continue)
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
63 use common_meta::region_registry::LeaderRegionManifestInfo;
64 use store_api::region_engine::RegionRole;
65 use store_api::storage::RegionId;
66
67 use super::*;
68 use crate::handler::test_utils::TestEnv;
69
70 fn new_region_stat(id: RegionId, manifest_version: u64, role: RegionRole) -> RegionStat {
71 RegionStat {
72 id,
73 region_manifest: RegionManifestInfo::Mito {
74 manifest_version,
75 flushed_entry_id: 0,
76 },
77 rcus: 0,
78 wcus: 0,
79 approximate_bytes: 0,
80 engine: "mito".to_string(),
81 role,
82 num_rows: 0,
83 memtable_size: 0,
84 manifest_size: 0,
85 sst_size: 0,
86 sst_num: 0,
87 index_size: 0,
88 data_topic_latest_entry_id: 0,
89 metadata_topic_latest_entry_id: 0,
90 written_bytes: 0,
91 }
92 }
93
94 #[tokio::test]
95 async fn test_handle_collect_leader_region() {
96 let env = TestEnv::new();
97 let mut ctx = env.ctx();
98
99 let mut acc = HeartbeatAccumulator {
100 stat: Some(Stat {
101 id: 1,
102 region_stats: vec![
103 new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader),
104 new_region_stat(RegionId::new(1, 2), 2, RegionRole::Follower),
105 ],
106 addr: "127.0.0.1:0000".to_string(),
107 region_num: 2,
108 ..Default::default()
109 }),
110 ..Default::default()
111 };
112
113 let handler = CollectLeaderRegionHandler;
114 let control = handler
115 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
116 .await
117 .unwrap();
118
119 assert_eq!(control, HandleControl::Continue);
120 let regions = ctx
121 .leader_region_registry
122 .batch_get(vec![RegionId::new(1, 1), RegionId::new(1, 2)].into_iter());
123 assert_eq!(regions.len(), 1);
124 assert_eq!(
125 regions.get(&RegionId::new(1, 1)),
126 Some(&LeaderRegion {
127 datanode_id: 1,
128 manifest: LeaderRegionManifestInfo::Mito {
129 manifest_version: 1,
130 flushed_entry_id: 0,
131 topic_latest_entry_id: 0,
132 },
133 })
134 );
135
136 acc.stat = Some(Stat {
138 id: 1,
139 region_stats: vec![new_region_stat(RegionId::new(1, 1), 2, RegionRole::Leader)],
140 timestamp_millis: 0,
141 addr: "127.0.0.1:0000".to_string(),
142 region_num: 1,
143 node_epoch: 0,
144 ..Default::default()
145 });
146 let control = handler
147 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
148 .await
149 .unwrap();
150
151 assert_eq!(control, HandleControl::Continue);
152 let regions = ctx
153 .leader_region_registry
154 .batch_get(vec![RegionId::new(1, 1)].into_iter());
155 assert_eq!(regions.len(), 1);
156 assert_eq!(
157 regions.get(&RegionId::new(1, 1)),
158 Some(&LeaderRegion {
159 datanode_id: 1,
160 manifest: LeaderRegionManifestInfo::Mito {
161 manifest_version: 2,
162 flushed_entry_id: 0,
163 topic_latest_entry_id: 0,
164 },
165 })
166 );
167
168 acc.stat = Some(Stat {
170 id: 1,
171 region_stats: vec![new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader)],
172 timestamp_millis: 0,
173 addr: "127.0.0.1:0000".to_string(),
174 region_num: 1,
175 node_epoch: 0,
176 ..Default::default()
177 });
178 let control = handler
179 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
180 .await
181 .unwrap();
182
183 assert_eq!(control, HandleControl::Continue);
184 let regions = ctx
185 .leader_region_registry
186 .batch_get(vec![RegionId::new(1, 1)].into_iter());
187 assert_eq!(regions.len(), 1);
188 assert_eq!(
189 regions.get(&RegionId::new(1, 1)),
190 Some(&LeaderRegion {
192 datanode_id: 1,
193 manifest: LeaderRegionManifestInfo::Mito {
194 manifest_version: 2,
195 flushed_entry_id: 0,
196 topic_latest_entry_id: 0,
197 },
198 })
199 );
200 }
201}