1use api::v1::meta::{HeartbeatRequest, Role};
16use common_meta::region_registry::{LeaderRegion, LeaderRegionManifestInfo};
17use store_api::region_engine::RegionRole;
18
19use crate::error::Result;
20use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
21use crate::metasrv::Context;
22
23pub struct CollectLeaderRegionHandler;
24
25#[async_trait::async_trait]
26impl HeartbeatHandler for CollectLeaderRegionHandler {
27 fn is_acceptable(&self, role: Role) -> bool {
28 role == Role::Datanode
29 }
30
31 async fn handle(
32 &self,
33 _req: &HeartbeatRequest,
34 ctx: &mut Context,
35 acc: &mut HeartbeatAccumulator,
36 ) -> Result<HandleControl> {
37 let Some(current_stat) = acc.stat.as_ref() else {
38 return Ok(HandleControl::Continue);
39 };
40
41 let mut key_values = Vec::with_capacity(current_stat.region_stats.len());
42 for stat in current_stat.region_stats.iter() {
43 if stat.role != RegionRole::Leader {
44 continue;
45 }
46
47 let manifest = LeaderRegionManifestInfo::from_region_stat(stat);
48 let value = LeaderRegion {
49 datanode_id: current_stat.id,
50 manifest,
51 };
52 key_values.push((stat.id, value));
53 }
54 ctx.leader_region_registry.batch_put(key_values);
55
56 Ok(HandleControl::Continue)
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use std::sync::Arc;
63
64 use common_meta::cache_invalidator::DummyCacheInvalidator;
65 use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
66 use common_meta::key::TableMetadataManager;
67 use common_meta::kv_backend::memory::MemoryKvBackend;
68 use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry};
69 use common_meta::sequence::SequenceBuilder;
70 use store_api::region_engine::RegionRole;
71 use store_api::storage::RegionId;
72
73 use super::*;
74 use crate::cluster::MetaPeerClientBuilder;
75 use crate::handler::{HeartbeatMailbox, Pushers};
76 use crate::service::store::cached_kv::LeaderCachedKvBackend;
77
78 fn mock_ctx() -> Context {
79 let in_memory = Arc::new(MemoryKvBackend::new());
80 let kv_backend = Arc::new(MemoryKvBackend::new());
81 let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
82 kv_backend.clone(),
83 ));
84 let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
85 let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
86 let meta_peer_client = MetaPeerClientBuilder::default()
87 .election(None)
88 .in_memory(in_memory.clone())
89 .build()
90 .map(Arc::new)
91 .unwrap();
93 Context {
94 server_addr: "127.0.0.1:0000".to_string(),
95 in_memory,
96 kv_backend: kv_backend.clone(),
97 leader_cached_kv_backend,
98 meta_peer_client,
99 mailbox,
100 election: None,
101 is_infancy: false,
102 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
103 cache_invalidator: Arc::new(DummyCacheInvalidator),
104 leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
105 }
106 }
107
108 fn new_region_stat(id: RegionId, manifest_version: u64, role: RegionRole) -> RegionStat {
109 RegionStat {
110 id,
111 region_manifest: RegionManifestInfo::Mito {
112 manifest_version,
113 flushed_entry_id: 0,
114 },
115 rcus: 0,
116 wcus: 0,
117 approximate_bytes: 0,
118 engine: "mito".to_string(),
119 role,
120 num_rows: 0,
121 memtable_size: 0,
122 manifest_size: 0,
123 sst_size: 0,
124 index_size: 0,
125 data_topic_latest_entry_id: 0,
126 metadata_topic_latest_entry_id: 0,
127 }
128 }
129
130 #[tokio::test]
131 async fn test_handle_collect_leader_region() {
132 let mut ctx = mock_ctx();
133
134 let mut acc = HeartbeatAccumulator {
135 stat: Some(Stat {
136 id: 1,
137 region_stats: vec![
138 new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader),
139 new_region_stat(RegionId::new(1, 2), 2, RegionRole::Follower),
140 ],
141 addr: "127.0.0.1:0000".to_string(),
142 region_num: 2,
143 ..Default::default()
144 }),
145 ..Default::default()
146 };
147
148 let handler = CollectLeaderRegionHandler;
149 let control = handler
150 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
151 .await
152 .unwrap();
153
154 assert_eq!(control, HandleControl::Continue);
155 let regions = ctx
156 .leader_region_registry
157 .batch_get(vec![RegionId::new(1, 1), RegionId::new(1, 2)].into_iter());
158 assert_eq!(regions.len(), 1);
159 assert_eq!(
160 regions.get(&RegionId::new(1, 1)),
161 Some(&LeaderRegion {
162 datanode_id: 1,
163 manifest: LeaderRegionManifestInfo::Mito {
164 manifest_version: 1,
165 flushed_entry_id: 0,
166 topic_latest_entry_id: 0,
167 },
168 })
169 );
170
171 acc.stat = Some(Stat {
173 id: 1,
174 region_stats: vec![new_region_stat(RegionId::new(1, 1), 2, RegionRole::Leader)],
175 timestamp_millis: 0,
176 addr: "127.0.0.1:0000".to_string(),
177 region_num: 1,
178 node_epoch: 0,
179 ..Default::default()
180 });
181 let control = handler
182 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
183 .await
184 .unwrap();
185
186 assert_eq!(control, HandleControl::Continue);
187 let regions = ctx
188 .leader_region_registry
189 .batch_get(vec![RegionId::new(1, 1)].into_iter());
190 assert_eq!(regions.len(), 1);
191 assert_eq!(
192 regions.get(&RegionId::new(1, 1)),
193 Some(&LeaderRegion {
194 datanode_id: 1,
195 manifest: LeaderRegionManifestInfo::Mito {
196 manifest_version: 2,
197 flushed_entry_id: 0,
198 topic_latest_entry_id: 0,
199 },
200 })
201 );
202
203 acc.stat = Some(Stat {
205 id: 1,
206 region_stats: vec![new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader)],
207 timestamp_millis: 0,
208 addr: "127.0.0.1:0000".to_string(),
209 region_num: 1,
210 node_epoch: 0,
211 ..Default::default()
212 });
213 let control = handler
214 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
215 .await
216 .unwrap();
217
218 assert_eq!(control, HandleControl::Continue);
219 let regions = ctx
220 .leader_region_registry
221 .batch_get(vec![RegionId::new(1, 1)].into_iter());
222 assert_eq!(regions.len(), 1);
223 assert_eq!(
224 regions.get(&RegionId::new(1, 1)),
225 Some(&LeaderRegion {
227 datanode_id: 1,
228 manifest: LeaderRegionManifestInfo::Mito {
229 manifest_version: 2,
230 flushed_entry_id: 0,
231 topic_latest_entry_id: 0,
232 },
233 })
234 );
235 }
236}