1use api::v1::meta::{HeartbeatRequest, Role};
16use common_meta::region_registry::{LeaderRegion, LeaderRegionManifestInfo};
17use store_api::region_engine::RegionRole;
18
19use crate::error::Result;
20use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
21use crate::metasrv::Context;
22
23pub struct CollectLeaderRegionHandler;
24
25#[async_trait::async_trait]
26impl HeartbeatHandler for CollectLeaderRegionHandler {
27 fn is_acceptable(&self, role: Role) -> bool {
28 role == Role::Datanode
29 }
30
31 async fn handle(
32 &self,
33 _req: &HeartbeatRequest,
34 ctx: &mut Context,
35 acc: &mut HeartbeatAccumulator,
36 ) -> Result<HandleControl> {
37 let Some(current_stat) = acc.stat.as_ref() else {
38 return Ok(HandleControl::Continue);
39 };
40
41 let mut key_values = Vec::with_capacity(current_stat.region_stats.len());
42 for stat in current_stat.region_stats.iter() {
43 if stat.role != RegionRole::Leader {
44 continue;
45 }
46
47 let manifest = LeaderRegionManifestInfo::from_region_stat(stat);
48 let value = LeaderRegion {
49 datanode_id: current_stat.id,
50 manifest,
51 };
52 key_values.push((stat.id, value));
53 }
54 ctx.leader_region_registry.batch_put(key_values);
55
56 Ok(HandleControl::Continue)
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
63 use common_meta::region_registry::LeaderRegionManifestInfo;
64 use store_api::region_engine::RegionRole;
65 use store_api::storage::RegionId;
66
67 use super::*;
68 use crate::handler::test_utils::TestEnv;
69
70 fn new_region_stat(id: RegionId, manifest_version: u64, role: RegionRole) -> RegionStat {
71 RegionStat {
72 id,
73 region_manifest: RegionManifestInfo::Mito {
74 manifest_version,
75 flushed_entry_id: 0,
76 file_removed_cnt: 0,
77 },
78 rcus: 0,
79 wcus: 0,
80 approximate_bytes: 0,
81 engine: "mito".to_string(),
82 role,
83 num_rows: 0,
84 memtable_size: 0,
85 manifest_size: 0,
86 sst_size: 0,
87 sst_num: 0,
88 index_size: 0,
89 data_topic_latest_entry_id: 0,
90 metadata_topic_latest_entry_id: 0,
91 written_bytes: 0,
92 }
93 }
94
95 #[tokio::test]
96 async fn test_handle_collect_leader_region() {
97 let env = TestEnv::new();
98 let mut ctx = env.ctx();
99
100 let mut acc = HeartbeatAccumulator {
101 stat: Some(Stat {
102 id: 1,
103 region_stats: vec![
104 new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader),
105 new_region_stat(RegionId::new(1, 2), 2, RegionRole::Follower),
106 ],
107 addr: "127.0.0.1:0000".to_string(),
108 region_num: 2,
109 ..Default::default()
110 }),
111 ..Default::default()
112 };
113
114 let handler = CollectLeaderRegionHandler;
115 let control = handler
116 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
117 .await
118 .unwrap();
119
120 assert_eq!(control, HandleControl::Continue);
121 let regions = ctx
122 .leader_region_registry
123 .batch_get(vec![RegionId::new(1, 1), RegionId::new(1, 2)].into_iter());
124 assert_eq!(regions.len(), 1);
125 assert_eq!(
126 regions.get(&RegionId::new(1, 1)),
127 Some(&LeaderRegion {
128 datanode_id: 1,
129 manifest: LeaderRegionManifestInfo::Mito {
130 manifest_version: 1,
131 flushed_entry_id: 0,
132 topic_latest_entry_id: 0,
133 },
134 })
135 );
136
137 acc.stat = Some(Stat {
139 id: 1,
140 region_stats: vec![new_region_stat(RegionId::new(1, 1), 2, RegionRole::Leader)],
141 timestamp_millis: 0,
142 addr: "127.0.0.1:0000".to_string(),
143 region_num: 1,
144 node_epoch: 0,
145 ..Default::default()
146 });
147 let control = handler
148 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
149 .await
150 .unwrap();
151
152 assert_eq!(control, HandleControl::Continue);
153 let regions = ctx
154 .leader_region_registry
155 .batch_get(vec![RegionId::new(1, 1)].into_iter());
156 assert_eq!(regions.len(), 1);
157 assert_eq!(
158 regions.get(&RegionId::new(1, 1)),
159 Some(&LeaderRegion {
160 datanode_id: 1,
161 manifest: LeaderRegionManifestInfo::Mito {
162 manifest_version: 2,
163 flushed_entry_id: 0,
164 topic_latest_entry_id: 0,
165 },
166 })
167 );
168
169 acc.stat = Some(Stat {
171 id: 1,
172 region_stats: vec![new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader)],
173 timestamp_millis: 0,
174 addr: "127.0.0.1:0000".to_string(),
175 region_num: 1,
176 node_epoch: 0,
177 ..Default::default()
178 });
179 let control = handler
180 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
181 .await
182 .unwrap();
183
184 assert_eq!(control, HandleControl::Continue);
185 let regions = ctx
186 .leader_region_registry
187 .batch_get(vec![RegionId::new(1, 1)].into_iter());
188 assert_eq!(regions.len(), 1);
189 assert_eq!(
190 regions.get(&RegionId::new(1, 1)),
191 Some(&LeaderRegion {
193 datanode_id: 1,
194 manifest: LeaderRegionManifestInfo::Mito {
195 manifest_version: 2,
196 flushed_entry_id: 0,
197 topic_latest_entry_id: 0,
198 },
199 })
200 );
201 }
202}