1use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19use common_telemetry::warn;
20use store_api::storage::RegionId;
21
22use crate::datanode::{RegionManifestInfo, RegionStat};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub struct LeaderRegion {
29 pub datanode_id: u64,
30 pub manifest: LeaderRegionManifestInfo,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum LeaderRegionManifestInfo {
35 Mito {
36 manifest_version: u64,
37 flushed_entry_id: u64,
38 topic_latest_entry_id: u64,
39 },
40 Metric {
41 data_manifest_version: u64,
42 data_flushed_entry_id: u64,
43 data_topic_latest_entry_id: u64,
44 metadata_manifest_version: u64,
45 metadata_flushed_entry_id: u64,
46 metadata_topic_latest_entry_id: u64,
47 },
48}
49
50impl LeaderRegionManifestInfo {
51 pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo {
53 match region_stat.region_manifest {
54 RegionManifestInfo::Metric {
55 data_manifest_version,
56 data_flushed_entry_id,
57 metadata_manifest_version,
58 metadata_flushed_entry_id,
59 } => LeaderRegionManifestInfo::Metric {
60 data_manifest_version,
61 data_flushed_entry_id,
62 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
63 metadata_manifest_version,
64 metadata_flushed_entry_id,
65 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
66 },
67 RegionManifestInfo::Mito {
68 manifest_version,
69 flushed_entry_id,
70 } => LeaderRegionManifestInfo::Mito {
71 manifest_version,
72 flushed_entry_id,
73 topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
74 },
75 }
76 }
77
78 pub fn manifest_version(&self) -> u64 {
80 match self {
81 LeaderRegionManifestInfo::Mito {
82 manifest_version, ..
83 } => *manifest_version,
84 LeaderRegionManifestInfo::Metric {
85 data_manifest_version,
86 ..
87 } => *data_manifest_version,
88 }
89 }
90
91 pub fn flushed_entry_id(&self) -> u64 {
93 match self {
94 LeaderRegionManifestInfo::Mito {
95 flushed_entry_id, ..
96 } => *flushed_entry_id,
97 LeaderRegionManifestInfo::Metric {
98 data_flushed_entry_id,
99 ..
100 } => *data_flushed_entry_id,
101 }
102 }
103
104 pub fn prunable_entry_id(&self) -> u64 {
114 match self {
115 LeaderRegionManifestInfo::Mito {
116 flushed_entry_id,
117 topic_latest_entry_id,
118 ..
119 } => (*flushed_entry_id).max(*topic_latest_entry_id),
120 LeaderRegionManifestInfo::Metric {
121 data_flushed_entry_id,
122 data_topic_latest_entry_id,
123 metadata_flushed_entry_id,
124 metadata_topic_latest_entry_id,
125 ..
126 } => {
127 let data_prunable_entry_id =
128 (*data_flushed_entry_id).max(*data_topic_latest_entry_id);
129 let metadata_prunable_entry_id =
130 (*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id);
131 data_prunable_entry_id.min(metadata_prunable_entry_id)
132 }
133 }
134 }
135
136 pub fn is_inactive(&self) -> bool {
143 match *self {
144 LeaderRegionManifestInfo::Mito {
145 flushed_entry_id,
146 topic_latest_entry_id,
147 ..
148 } => flushed_entry_id < topic_latest_entry_id,
149 LeaderRegionManifestInfo::Metric {
150 data_flushed_entry_id,
151 data_topic_latest_entry_id,
152 metadata_flushed_entry_id,
153 metadata_topic_latest_entry_id,
154 ..
155 } => {
156 data_flushed_entry_id < data_topic_latest_entry_id
157 || metadata_flushed_entry_id < metadata_topic_latest_entry_id
158 }
159 }
160 }
161}
162
163pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
164
165#[derive(Default)]
169pub struct LeaderRegionRegistry {
170 inner: RwLock<HashMap<RegionId, LeaderRegion>>,
171}
172
173impl LeaderRegionRegistry {
174 pub fn new() -> Self {
176 Self {
177 inner: RwLock::new(HashMap::new()),
178 }
179 }
180
181 pub fn batch_get<I: Iterator<Item = RegionId>>(
183 &self,
184 region_ids: I,
185 ) -> HashMap<RegionId, LeaderRegion> {
186 let inner = self.inner.read().unwrap();
187 region_ids
188 .into_iter()
189 .flat_map(|region_id| {
190 inner
191 .get(®ion_id)
192 .map(|leader_region| (region_id, *leader_region))
193 })
194 .collect::<HashMap<_, _>>()
195 }
196
197 pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
199 let mut inner = self.inner.write().unwrap();
200 for (region_id, leader_region) in key_values {
201 match inner.entry(region_id) {
202 Entry::Vacant(entry) => {
203 entry.insert(leader_region);
204 }
205 Entry::Occupied(mut entry) => {
206 let manifest_version = entry.get().manifest.manifest_version();
207 if manifest_version > leader_region.manifest.manifest_version() {
208 warn!(
209 "Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
210 region_id,
211 manifest_version,
212 leader_region.manifest.manifest_version()
213 );
214 } else {
215 entry.insert(leader_region);
216 }
217 }
218 }
219 }
220 }
221
222 pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
223 let mut inner = self.inner.write().unwrap();
224 for region_id in region_ids {
225 inner.remove(®ion_id);
226 }
227 }
228
229 pub fn reset(&self) {
231 let mut inner = self.inner.write().unwrap();
232 inner.clear();
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239
240 fn mito_manifest_info(
241 flushed_entry_id: u64,
242 topic_latest_entry_id: u64,
243 ) -> LeaderRegionManifestInfo {
244 LeaderRegionManifestInfo::Mito {
245 flushed_entry_id,
246 topic_latest_entry_id,
247 manifest_version: 1,
248 }
249 }
250
251 fn metric_manifest_info(
252 data_flushed_entry_id: u64,
253 data_topic_latest_entry_id: u64,
254 metadata_flushed_entry_id: u64,
255 metadata_topic_latest_entry_id: u64,
256 ) -> LeaderRegionManifestInfo {
257 LeaderRegionManifestInfo::Metric {
258 data_flushed_entry_id,
259 data_topic_latest_entry_id,
260 metadata_flushed_entry_id,
261 metadata_topic_latest_entry_id,
262 data_manifest_version: 1,
263 metadata_manifest_version: 1,
264 }
265 }
266
267 #[test]
268 fn test_is_inactive_mito() {
269 let info = mito_manifest_info(10, 20);
271 assert!(info.is_inactive());
272 let info = mito_manifest_info(20, 20);
274 assert!(!info.is_inactive());
275 let info = mito_manifest_info(30, 20);
277 assert!(!info.is_inactive());
278 }
279
280 #[test]
281 fn test_is_inactive_metric() {
282 let info = metric_manifest_info(5, 10, 20, 20);
284 assert!(info.is_inactive());
285 let info = metric_manifest_info(10, 10, 15, 20);
287 assert!(info.is_inactive());
288 let info = metric_manifest_info(1, 2, 3, 4);
290 assert!(info.is_inactive());
291 let info = metric_manifest_info(10, 10, 20, 20);
293 assert!(!info.is_inactive());
294 let info = metric_manifest_info(30, 20, 40, 20);
296 assert!(!info.is_inactive());
297 }
298
299 #[test]
300 fn test_prunable_entry_id_mito() {
301 let info = mito_manifest_info(100, 120);
302 assert_eq!(info.prunable_entry_id(), 120);
304
305 let info = mito_manifest_info(150, 120);
306 assert_eq!(info.prunable_entry_id(), 150);
308
309 let info = mito_manifest_info(0, 0);
310 assert_eq!(info.prunable_entry_id(), 0);
311 }
312
313 #[test]
314 fn test_prunable_entry_id_metric() {
315 let info = metric_manifest_info(100, 120, 90, 110);
316 assert_eq!(info.prunable_entry_id(), 110);
320 let info = metric_manifest_info(200, 150, 180, 220);
321 assert_eq!(info.prunable_entry_id(), 200);
325 let info = metric_manifest_info(0, 0, 0, 0);
326 assert_eq!(info.prunable_entry_id(), 0);
327 }
328}