1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::{Arc, RwLock};
18
19use common_telemetry::warn;
20use store_api::storage::RegionId;
21
22use crate::datanode::{RegionManifestInfo, RegionStat};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub struct LeaderRegion {
29 pub datanode_id: u64,
30 pub manifest: LeaderRegionManifestInfo,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum LeaderRegionManifestInfo {
35 Mito {
36 manifest_version: u64,
37 flushed_entry_id: u64,
38 topic_latest_entry_id: u64,
39 },
40 Metric {
41 data_manifest_version: u64,
42 data_flushed_entry_id: u64,
43 data_topic_latest_entry_id: u64,
44 metadata_manifest_version: u64,
45 metadata_flushed_entry_id: u64,
46 metadata_topic_latest_entry_id: u64,
47 },
48}
49
50impl LeaderRegionManifestInfo {
51 pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo {
53 match region_stat.region_manifest {
54 RegionManifestInfo::Metric {
55 data_manifest_version,
56 data_flushed_entry_id,
57 metadata_manifest_version,
58 metadata_flushed_entry_id,
59 } => LeaderRegionManifestInfo::Metric {
60 data_manifest_version,
61 data_flushed_entry_id,
62 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
63 metadata_manifest_version,
64 metadata_flushed_entry_id,
65 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
66 },
67 RegionManifestInfo::Mito {
68 manifest_version,
69 flushed_entry_id,
70 file_removed_cnt: _,
71 } => LeaderRegionManifestInfo::Mito {
72 manifest_version,
73 flushed_entry_id,
74 topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
75 },
76 }
77 }
78
79 pub fn manifest_version(&self) -> u64 {
81 match self {
82 LeaderRegionManifestInfo::Mito {
83 manifest_version, ..
84 } => *manifest_version,
85 LeaderRegionManifestInfo::Metric {
86 data_manifest_version,
87 ..
88 } => *data_manifest_version,
89 }
90 }
91
92 pub fn flushed_entry_id(&self) -> u64 {
94 match self {
95 LeaderRegionManifestInfo::Mito {
96 flushed_entry_id, ..
97 } => *flushed_entry_id,
98 LeaderRegionManifestInfo::Metric {
99 data_flushed_entry_id,
100 ..
101 } => *data_flushed_entry_id,
102 }
103 }
104
105 pub fn prunable_entry_id(&self) -> u64 {
115 match self {
116 LeaderRegionManifestInfo::Mito {
117 flushed_entry_id,
118 topic_latest_entry_id,
119 ..
120 } => (*flushed_entry_id).max(*topic_latest_entry_id),
121 LeaderRegionManifestInfo::Metric {
122 data_flushed_entry_id,
123 data_topic_latest_entry_id,
124 metadata_flushed_entry_id,
125 metadata_topic_latest_entry_id,
126 ..
127 } => {
128 let data_prunable_entry_id =
129 (*data_flushed_entry_id).max(*data_topic_latest_entry_id);
130 let metadata_prunable_entry_id =
131 (*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id);
132 data_prunable_entry_id.min(metadata_prunable_entry_id)
133 }
134 }
135 }
136
137 pub fn replay_entry_id(&self) -> u64 {
139 match self {
140 LeaderRegionManifestInfo::Mito {
141 flushed_entry_id,
142 topic_latest_entry_id,
143 ..
144 } => (*flushed_entry_id).max(*topic_latest_entry_id),
145 LeaderRegionManifestInfo::Metric {
146 data_flushed_entry_id,
147 data_topic_latest_entry_id,
148 ..
149 } => (*data_flushed_entry_id).max(*data_topic_latest_entry_id),
150 }
151 }
152
153 pub fn metadata_replay_entry_id(&self) -> Option<u64> {
155 match self {
156 LeaderRegionManifestInfo::Metric {
157 metadata_flushed_entry_id,
158 metadata_topic_latest_entry_id,
159 ..
160 } => Some((*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id)),
161 _ => None,
162 }
163 }
164
165 pub fn is_inactive(&self) -> bool {
172 match *self {
173 LeaderRegionManifestInfo::Mito {
174 flushed_entry_id,
175 topic_latest_entry_id,
176 ..
177 } => flushed_entry_id < topic_latest_entry_id,
178 LeaderRegionManifestInfo::Metric {
179 data_flushed_entry_id,
180 data_topic_latest_entry_id,
181 metadata_flushed_entry_id,
182 metadata_topic_latest_entry_id,
183 ..
184 } => {
185 data_flushed_entry_id < data_topic_latest_entry_id
186 || metadata_flushed_entry_id < metadata_topic_latest_entry_id
187 }
188 }
189 }
190}
191
192pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
193
194#[derive(Default)]
198pub struct LeaderRegionRegistry {
199 inner: RwLock<HashMap<RegionId, LeaderRegion>>,
200}
201
202impl LeaderRegionRegistry {
203 pub fn new() -> Self {
205 Self {
206 inner: RwLock::new(HashMap::new()),
207 }
208 }
209
210 pub fn batch_get<I: Iterator<Item = RegionId>>(
212 &self,
213 region_ids: I,
214 ) -> HashMap<RegionId, LeaderRegion> {
215 let inner = self.inner.read().unwrap();
216 region_ids
217 .into_iter()
218 .flat_map(|region_id| {
219 inner
220 .get(®ion_id)
221 .map(|leader_region| (region_id, *leader_region))
222 })
223 .collect::<HashMap<_, _>>()
224 }
225
226 pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
228 let mut inner = self.inner.write().unwrap();
229 for (region_id, leader_region) in key_values {
230 match inner.entry(region_id) {
231 Entry::Vacant(entry) => {
232 entry.insert(leader_region);
233 }
234 Entry::Occupied(mut entry) => {
235 let manifest_version = entry.get().manifest.manifest_version();
236 if manifest_version > leader_region.manifest.manifest_version() {
237 warn!(
238 "Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
239 region_id,
240 manifest_version,
241 leader_region.manifest.manifest_version()
242 );
243 } else {
244 entry.insert(leader_region);
245 }
246 }
247 }
248 }
249 }
250
251 pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
252 let mut inner = self.inner.write().unwrap();
253 for region_id in region_ids {
254 inner.remove(®ion_id);
255 }
256 }
257
258 pub fn reset(&self) {
260 let mut inner = self.inner.write().unwrap();
261 inner.clear();
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268
269 fn mito_manifest_info(
270 flushed_entry_id: u64,
271 topic_latest_entry_id: u64,
272 ) -> LeaderRegionManifestInfo {
273 LeaderRegionManifestInfo::Mito {
274 flushed_entry_id,
275 topic_latest_entry_id,
276 manifest_version: 1,
277 }
278 }
279
280 fn metric_manifest_info(
281 data_flushed_entry_id: u64,
282 data_topic_latest_entry_id: u64,
283 metadata_flushed_entry_id: u64,
284 metadata_topic_latest_entry_id: u64,
285 ) -> LeaderRegionManifestInfo {
286 LeaderRegionManifestInfo::Metric {
287 data_flushed_entry_id,
288 data_topic_latest_entry_id,
289 metadata_flushed_entry_id,
290 metadata_topic_latest_entry_id,
291 data_manifest_version: 1,
292 metadata_manifest_version: 1,
293 }
294 }
295
296 #[test]
297 fn test_is_inactive_mito() {
298 let info = mito_manifest_info(10, 20);
300 assert!(info.is_inactive());
301 let info = mito_manifest_info(20, 20);
303 assert!(!info.is_inactive());
304 let info = mito_manifest_info(30, 20);
306 assert!(!info.is_inactive());
307 }
308
309 #[test]
310 fn test_is_inactive_metric() {
311 let info = metric_manifest_info(5, 10, 20, 20);
313 assert!(info.is_inactive());
314 let info = metric_manifest_info(10, 10, 15, 20);
316 assert!(info.is_inactive());
317 let info = metric_manifest_info(1, 2, 3, 4);
319 assert!(info.is_inactive());
320 let info = metric_manifest_info(10, 10, 20, 20);
322 assert!(!info.is_inactive());
323 let info = metric_manifest_info(30, 20, 40, 20);
325 assert!(!info.is_inactive());
326 }
327
328 #[test]
329 fn test_prunable_entry_id_mito() {
330 let info = mito_manifest_info(100, 120);
331 assert_eq!(info.prunable_entry_id(), 120);
333
334 let info = mito_manifest_info(150, 120);
335 assert_eq!(info.prunable_entry_id(), 150);
337
338 let info = mito_manifest_info(0, 0);
339 assert_eq!(info.prunable_entry_id(), 0);
340 }
341
342 #[test]
343 fn test_prunable_entry_id_metric() {
344 let info = metric_manifest_info(100, 120, 90, 110);
345 assert_eq!(info.prunable_entry_id(), 110);
349 let info = metric_manifest_info(200, 150, 180, 220);
350 assert_eq!(info.prunable_entry_id(), 200);
354 let info = metric_manifest_info(0, 0, 0, 0);
355 assert_eq!(info.prunable_entry_id(), 0);
356 }
357}