1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::{Arc, RwLock};
18
19use common_telemetry::warn;
20use store_api::storage::RegionId;
21
22use crate::datanode::{RegionManifestInfo, RegionStat};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub struct LeaderRegion {
29 pub datanode_id: u64,
30 pub manifest: LeaderRegionManifestInfo,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum LeaderRegionManifestInfo {
35 Mito {
36 manifest_version: u64,
37 flushed_entry_id: u64,
38 topic_latest_entry_id: u64,
39 },
40 Metric {
41 data_manifest_version: u64,
42 data_flushed_entry_id: u64,
43 data_topic_latest_entry_id: u64,
44 metadata_manifest_version: u64,
45 metadata_flushed_entry_id: u64,
46 metadata_topic_latest_entry_id: u64,
47 },
48}
49
50impl LeaderRegionManifestInfo {
51 pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo {
53 match region_stat.region_manifest {
54 RegionManifestInfo::Metric {
55 data_manifest_version,
56 data_flushed_entry_id,
57 metadata_manifest_version,
58 metadata_flushed_entry_id,
59 } => LeaderRegionManifestInfo::Metric {
60 data_manifest_version,
61 data_flushed_entry_id,
62 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
63 metadata_manifest_version,
64 metadata_flushed_entry_id,
65 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
66 },
67 RegionManifestInfo::Mito {
68 manifest_version,
69 flushed_entry_id,
70 } => LeaderRegionManifestInfo::Mito {
71 manifest_version,
72 flushed_entry_id,
73 topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
74 },
75 }
76 }
77
78 pub fn manifest_version(&self) -> u64 {
80 match self {
81 LeaderRegionManifestInfo::Mito {
82 manifest_version, ..
83 } => *manifest_version,
84 LeaderRegionManifestInfo::Metric {
85 data_manifest_version,
86 ..
87 } => *data_manifest_version,
88 }
89 }
90
91 pub fn flushed_entry_id(&self) -> u64 {
93 match self {
94 LeaderRegionManifestInfo::Mito {
95 flushed_entry_id, ..
96 } => *flushed_entry_id,
97 LeaderRegionManifestInfo::Metric {
98 data_flushed_entry_id,
99 ..
100 } => *data_flushed_entry_id,
101 }
102 }
103
104 pub fn prunable_entry_id(&self) -> u64 {
114 match self {
115 LeaderRegionManifestInfo::Mito {
116 flushed_entry_id,
117 topic_latest_entry_id,
118 ..
119 } => (*flushed_entry_id).max(*topic_latest_entry_id),
120 LeaderRegionManifestInfo::Metric {
121 data_flushed_entry_id,
122 data_topic_latest_entry_id,
123 metadata_flushed_entry_id,
124 metadata_topic_latest_entry_id,
125 ..
126 } => {
127 let data_prunable_entry_id =
128 (*data_flushed_entry_id).max(*data_topic_latest_entry_id);
129 let metadata_prunable_entry_id =
130 (*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id);
131 data_prunable_entry_id.min(metadata_prunable_entry_id)
132 }
133 }
134 }
135
136 pub fn replay_entry_id(&self) -> u64 {
138 match self {
139 LeaderRegionManifestInfo::Mito {
140 flushed_entry_id,
141 topic_latest_entry_id,
142 ..
143 } => (*flushed_entry_id).max(*topic_latest_entry_id),
144 LeaderRegionManifestInfo::Metric {
145 data_flushed_entry_id,
146 data_topic_latest_entry_id,
147 ..
148 } => (*data_flushed_entry_id).max(*data_topic_latest_entry_id),
149 }
150 }
151
152 pub fn metadata_replay_entry_id(&self) -> Option<u64> {
154 match self {
155 LeaderRegionManifestInfo::Metric {
156 metadata_flushed_entry_id,
157 metadata_topic_latest_entry_id,
158 ..
159 } => Some((*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id)),
160 _ => None,
161 }
162 }
163
164 pub fn is_inactive(&self) -> bool {
171 match *self {
172 LeaderRegionManifestInfo::Mito {
173 flushed_entry_id,
174 topic_latest_entry_id,
175 ..
176 } => flushed_entry_id < topic_latest_entry_id,
177 LeaderRegionManifestInfo::Metric {
178 data_flushed_entry_id,
179 data_topic_latest_entry_id,
180 metadata_flushed_entry_id,
181 metadata_topic_latest_entry_id,
182 ..
183 } => {
184 data_flushed_entry_id < data_topic_latest_entry_id
185 || metadata_flushed_entry_id < metadata_topic_latest_entry_id
186 }
187 }
188 }
189}
190
191pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
192
193#[derive(Default)]
197pub struct LeaderRegionRegistry {
198 inner: RwLock<HashMap<RegionId, LeaderRegion>>,
199}
200
201impl LeaderRegionRegistry {
202 pub fn new() -> Self {
204 Self {
205 inner: RwLock::new(HashMap::new()),
206 }
207 }
208
209 pub fn batch_get<I: Iterator<Item = RegionId>>(
211 &self,
212 region_ids: I,
213 ) -> HashMap<RegionId, LeaderRegion> {
214 let inner = self.inner.read().unwrap();
215 region_ids
216 .into_iter()
217 .flat_map(|region_id| {
218 inner
219 .get(®ion_id)
220 .map(|leader_region| (region_id, *leader_region))
221 })
222 .collect::<HashMap<_, _>>()
223 }
224
225 pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
227 let mut inner = self.inner.write().unwrap();
228 for (region_id, leader_region) in key_values {
229 match inner.entry(region_id) {
230 Entry::Vacant(entry) => {
231 entry.insert(leader_region);
232 }
233 Entry::Occupied(mut entry) => {
234 let manifest_version = entry.get().manifest.manifest_version();
235 if manifest_version > leader_region.manifest.manifest_version() {
236 warn!(
237 "Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
238 region_id,
239 manifest_version,
240 leader_region.manifest.manifest_version()
241 );
242 } else {
243 entry.insert(leader_region);
244 }
245 }
246 }
247 }
248 }
249
250 pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
251 let mut inner = self.inner.write().unwrap();
252 for region_id in region_ids {
253 inner.remove(®ion_id);
254 }
255 }
256
257 pub fn reset(&self) {
259 let mut inner = self.inner.write().unwrap();
260 inner.clear();
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267
268 fn mito_manifest_info(
269 flushed_entry_id: u64,
270 topic_latest_entry_id: u64,
271 ) -> LeaderRegionManifestInfo {
272 LeaderRegionManifestInfo::Mito {
273 flushed_entry_id,
274 topic_latest_entry_id,
275 manifest_version: 1,
276 }
277 }
278
279 fn metric_manifest_info(
280 data_flushed_entry_id: u64,
281 data_topic_latest_entry_id: u64,
282 metadata_flushed_entry_id: u64,
283 metadata_topic_latest_entry_id: u64,
284 ) -> LeaderRegionManifestInfo {
285 LeaderRegionManifestInfo::Metric {
286 data_flushed_entry_id,
287 data_topic_latest_entry_id,
288 metadata_flushed_entry_id,
289 metadata_topic_latest_entry_id,
290 data_manifest_version: 1,
291 metadata_manifest_version: 1,
292 }
293 }
294
295 #[test]
296 fn test_is_inactive_mito() {
297 let info = mito_manifest_info(10, 20);
299 assert!(info.is_inactive());
300 let info = mito_manifest_info(20, 20);
302 assert!(!info.is_inactive());
303 let info = mito_manifest_info(30, 20);
305 assert!(!info.is_inactive());
306 }
307
308 #[test]
309 fn test_is_inactive_metric() {
310 let info = metric_manifest_info(5, 10, 20, 20);
312 assert!(info.is_inactive());
313 let info = metric_manifest_info(10, 10, 15, 20);
315 assert!(info.is_inactive());
316 let info = metric_manifest_info(1, 2, 3, 4);
318 assert!(info.is_inactive());
319 let info = metric_manifest_info(10, 10, 20, 20);
321 assert!(!info.is_inactive());
322 let info = metric_manifest_info(30, 20, 40, 20);
324 assert!(!info.is_inactive());
325 }
326
327 #[test]
328 fn test_prunable_entry_id_mito() {
329 let info = mito_manifest_info(100, 120);
330 assert_eq!(info.prunable_entry_id(), 120);
332
333 let info = mito_manifest_info(150, 120);
334 assert_eq!(info.prunable_entry_id(), 150);
336
337 let info = mito_manifest_info(0, 0);
338 assert_eq!(info.prunable_entry_id(), 0);
339 }
340
341 #[test]
342 fn test_prunable_entry_id_metric() {
343 let info = metric_manifest_info(100, 120, 90, 110);
344 assert_eq!(info.prunable_entry_id(), 110);
348 let info = metric_manifest_info(200, 150, 180, 220);
349 assert_eq!(info.prunable_entry_id(), 200);
353 let info = metric_manifest_info(0, 0, 0, 0);
354 assert_eq!(info.prunable_entry_id(), 0);
355 }
356}