1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::{Arc, RwLock};
18
19use common_telemetry::{debug, warn};
20use store_api::storage::RegionId;
21
22use crate::datanode::{RegionManifestInfo, RegionStat};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub struct LeaderRegion {
29 pub datanode_id: u64,
30 pub manifest: LeaderRegionManifestInfo,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum LeaderRegionManifestInfo {
35 Mito {
36 manifest_version: u64,
37 flushed_entry_id: u64,
38 topic_latest_entry_id: u64,
39 },
40 Metric {
41 data_manifest_version: u64,
42 data_flushed_entry_id: u64,
43 data_topic_latest_entry_id: u64,
44 metadata_manifest_version: u64,
45 metadata_flushed_entry_id: u64,
46 metadata_topic_latest_entry_id: u64,
47 },
48}
49
50impl LeaderRegionManifestInfo {
51 pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo {
53 match region_stat.region_manifest {
54 RegionManifestInfo::Metric {
55 data_manifest_version,
56 data_flushed_entry_id,
57 metadata_manifest_version,
58 metadata_flushed_entry_id,
59 } => LeaderRegionManifestInfo::Metric {
60 data_manifest_version,
61 data_flushed_entry_id,
62 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
63 metadata_manifest_version,
64 metadata_flushed_entry_id,
65 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
66 },
67 RegionManifestInfo::Mito {
68 manifest_version,
69 flushed_entry_id,
70 file_removed_cnt: _,
71 } => LeaderRegionManifestInfo::Mito {
72 manifest_version,
73 flushed_entry_id,
74 topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
75 },
76 }
77 }
78
79 pub fn manifest_version(&self) -> u64 {
81 match self {
82 LeaderRegionManifestInfo::Mito {
83 manifest_version, ..
84 } => *manifest_version,
85 LeaderRegionManifestInfo::Metric {
86 data_manifest_version,
87 ..
88 } => *data_manifest_version,
89 }
90 }
91
92 pub fn flushed_entry_id(&self) -> u64 {
94 match self {
95 LeaderRegionManifestInfo::Mito {
96 flushed_entry_id, ..
97 } => *flushed_entry_id,
98 LeaderRegionManifestInfo::Metric {
99 data_flushed_entry_id,
100 ..
101 } => *data_flushed_entry_id,
102 }
103 }
104
105 pub fn prunable_entry_id(&self) -> u64 {
115 match self {
116 LeaderRegionManifestInfo::Mito {
117 flushed_entry_id,
118 topic_latest_entry_id,
119 ..
120 } => (*flushed_entry_id).max(*topic_latest_entry_id),
121 LeaderRegionManifestInfo::Metric {
122 data_flushed_entry_id,
123 data_topic_latest_entry_id,
124 metadata_flushed_entry_id,
125 metadata_topic_latest_entry_id,
126 ..
127 } => {
128 let data_prunable_entry_id =
129 (*data_flushed_entry_id).max(*data_topic_latest_entry_id);
130 let metadata_prunable_entry_id =
131 (*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id);
132 data_prunable_entry_id.min(metadata_prunable_entry_id)
133 }
134 }
135 }
136
137 pub fn replay_entry_id(&self) -> u64 {
139 match self {
140 LeaderRegionManifestInfo::Mito {
141 flushed_entry_id,
142 topic_latest_entry_id,
143 ..
144 } => (*flushed_entry_id).max(*topic_latest_entry_id),
145 LeaderRegionManifestInfo::Metric {
146 data_flushed_entry_id,
147 data_topic_latest_entry_id,
148 ..
149 } => (*data_flushed_entry_id).max(*data_topic_latest_entry_id),
150 }
151 }
152
153 pub fn metadata_replay_entry_id(&self) -> Option<u64> {
155 match self {
156 LeaderRegionManifestInfo::Metric {
157 metadata_flushed_entry_id,
158 metadata_topic_latest_entry_id,
159 ..
160 } => Some((*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id)),
161 _ => None,
162 }
163 }
164
165 pub fn is_inactive(&self) -> bool {
172 match *self {
173 LeaderRegionManifestInfo::Mito {
174 flushed_entry_id,
175 topic_latest_entry_id,
176 ..
177 } => flushed_entry_id < topic_latest_entry_id,
178 LeaderRegionManifestInfo::Metric {
179 data_flushed_entry_id,
180 data_topic_latest_entry_id,
181 metadata_flushed_entry_id,
182 metadata_topic_latest_entry_id,
183 ..
184 } => {
185 data_flushed_entry_id < data_topic_latest_entry_id
186 || metadata_flushed_entry_id < metadata_topic_latest_entry_id
187 }
188 }
189 }
190}
191
192pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
193
194#[derive(Default)]
198pub struct LeaderRegionRegistry {
199 inner: RwLock<HashMap<RegionId, LeaderRegion>>,
200}
201
202impl LeaderRegionRegistry {
203 pub fn new() -> Self {
205 Self {
206 inner: RwLock::new(HashMap::new()),
207 }
208 }
209
210 pub fn batch_get<I: Iterator<Item = RegionId>>(
212 &self,
213 region_ids: I,
214 ) -> HashMap<RegionId, LeaderRegion> {
215 let inner = self.inner.read().unwrap();
216 region_ids
217 .into_iter()
218 .flat_map(|region_id| {
219 inner
220 .get(®ion_id)
221 .map(|leader_region| (region_id, *leader_region))
222 })
223 .collect::<HashMap<_, _>>()
224 }
225
226 pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
228 let mut inner = self.inner.write().unwrap();
229 for (region_id, leader_region) in key_values {
230 match inner.entry(region_id) {
231 Entry::Vacant(entry) => {
232 entry.insert(leader_region);
233 }
234 Entry::Occupied(mut entry) => {
235 let manifest_version = entry.get().manifest.manifest_version();
236 if manifest_version > leader_region.manifest.manifest_version() {
237 warn!(
238 "Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
239 region_id,
240 manifest_version,
241 leader_region.manifest.manifest_version()
242 );
243 } else {
244 debug!(
245 "Updating leader region for region {}, pruned entry id: {}",
246 region_id,
247 leader_region.manifest.prunable_entry_id(),
248 );
249 entry.insert(leader_region);
250 }
251 }
252 }
253 }
254 }
255
256 pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
257 let mut inner = self.inner.write().unwrap();
258 for region_id in region_ids {
259 inner.remove(®ion_id);
260 }
261 }
262
263 pub fn reset(&self) {
265 let mut inner = self.inner.write().unwrap();
266 inner.clear();
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 fn mito_manifest_info(
275 flushed_entry_id: u64,
276 topic_latest_entry_id: u64,
277 ) -> LeaderRegionManifestInfo {
278 LeaderRegionManifestInfo::Mito {
279 flushed_entry_id,
280 topic_latest_entry_id,
281 manifest_version: 1,
282 }
283 }
284
285 fn metric_manifest_info(
286 data_flushed_entry_id: u64,
287 data_topic_latest_entry_id: u64,
288 metadata_flushed_entry_id: u64,
289 metadata_topic_latest_entry_id: u64,
290 ) -> LeaderRegionManifestInfo {
291 LeaderRegionManifestInfo::Metric {
292 data_flushed_entry_id,
293 data_topic_latest_entry_id,
294 metadata_flushed_entry_id,
295 metadata_topic_latest_entry_id,
296 data_manifest_version: 1,
297 metadata_manifest_version: 1,
298 }
299 }
300
301 #[test]
302 fn test_is_inactive_mito() {
303 let info = mito_manifest_info(10, 20);
305 assert!(info.is_inactive());
306 let info = mito_manifest_info(20, 20);
308 assert!(!info.is_inactive());
309 let info = mito_manifest_info(30, 20);
311 assert!(!info.is_inactive());
312 }
313
314 #[test]
315 fn test_is_inactive_metric() {
316 let info = metric_manifest_info(5, 10, 20, 20);
318 assert!(info.is_inactive());
319 let info = metric_manifest_info(10, 10, 15, 20);
321 assert!(info.is_inactive());
322 let info = metric_manifest_info(1, 2, 3, 4);
324 assert!(info.is_inactive());
325 let info = metric_manifest_info(10, 10, 20, 20);
327 assert!(!info.is_inactive());
328 let info = metric_manifest_info(30, 20, 40, 20);
330 assert!(!info.is_inactive());
331 }
332
333 #[test]
334 fn test_prunable_entry_id_mito() {
335 let info = mito_manifest_info(100, 120);
336 assert_eq!(info.prunable_entry_id(), 120);
338
339 let info = mito_manifest_info(150, 120);
340 assert_eq!(info.prunable_entry_id(), 150);
342
343 let info = mito_manifest_info(0, 0);
344 assert_eq!(info.prunable_entry_id(), 0);
345 }
346
347 #[test]
348 fn test_prunable_entry_id_metric() {
349 let info = metric_manifest_info(100, 120, 90, 110);
350 assert_eq!(info.prunable_entry_id(), 110);
354 let info = metric_manifest_info(200, 150, 180, 220);
355 assert_eq!(info.prunable_entry_id(), 200);
359 let info = metric_manifest_info(0, 0, 0, 0);
360 assert_eq!(info.prunable_entry_id(), 0);
361 }
362}