common_meta/
region_registry.rs1use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19use common_telemetry::warn;
20use store_api::storage::RegionId;
21
22use crate::datanode::{RegionManifestInfo, RegionStat};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub struct LeaderRegion {
29 pub datanode_id: u64,
30 pub manifest: LeaderRegionManifestInfo,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum LeaderRegionManifestInfo {
35 Mito {
36 manifest_version: u64,
37 flushed_entry_id: u64,
38 topic_latest_entry_id: u64,
39 },
40 Metric {
41 data_manifest_version: u64,
42 data_flushed_entry_id: u64,
43 data_topic_latest_entry_id: u64,
44 metadata_manifest_version: u64,
45 metadata_flushed_entry_id: u64,
46 metadata_topic_latest_entry_id: u64,
47 },
48}
49
50impl LeaderRegionManifestInfo {
51 pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo {
53 match region_stat.region_manifest {
54 RegionManifestInfo::Metric {
55 data_manifest_version,
56 data_flushed_entry_id,
57 metadata_manifest_version,
58 metadata_flushed_entry_id,
59 } => LeaderRegionManifestInfo::Metric {
60 data_manifest_version,
61 data_flushed_entry_id,
62 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
63 metadata_manifest_version,
64 metadata_flushed_entry_id,
65 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
66 },
67 RegionManifestInfo::Mito {
68 manifest_version,
69 flushed_entry_id,
70 } => LeaderRegionManifestInfo::Mito {
71 manifest_version,
72 flushed_entry_id,
73 topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
74 },
75 }
76 }
77
78 pub fn manifest_version(&self) -> u64 {
80 match self {
81 LeaderRegionManifestInfo::Mito {
82 manifest_version, ..
83 } => *manifest_version,
84 LeaderRegionManifestInfo::Metric {
85 data_manifest_version,
86 ..
87 } => *data_manifest_version,
88 }
89 }
90
91 pub fn flushed_entry_id(&self) -> u64 {
93 match self {
94 LeaderRegionManifestInfo::Mito {
95 flushed_entry_id, ..
96 } => *flushed_entry_id,
97 LeaderRegionManifestInfo::Metric {
98 data_flushed_entry_id,
99 ..
100 } => *data_flushed_entry_id,
101 }
102 }
103
104 pub fn prunable_entry_id(&self) -> u64 {
114 match self {
115 LeaderRegionManifestInfo::Mito {
116 flushed_entry_id,
117 topic_latest_entry_id,
118 ..
119 } => (*flushed_entry_id).max(*topic_latest_entry_id),
120 LeaderRegionManifestInfo::Metric {
121 data_flushed_entry_id,
122 data_topic_latest_entry_id,
123 metadata_flushed_entry_id,
124 metadata_topic_latest_entry_id,
125 ..
126 } => {
127 let data_prunable_entry_id =
128 (*data_flushed_entry_id).max(*data_topic_latest_entry_id);
129 let metadata_prunable_entry_id =
130 (*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id);
131 data_prunable_entry_id.min(metadata_prunable_entry_id)
132 }
133 }
134 }
135}
136
137pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
138
139#[derive(Default)]
143pub struct LeaderRegionRegistry {
144 inner: RwLock<HashMap<RegionId, LeaderRegion>>,
145}
146
147impl LeaderRegionRegistry {
148 pub fn new() -> Self {
150 Self {
151 inner: RwLock::new(HashMap::new()),
152 }
153 }
154
155 pub fn batch_get<I: Iterator<Item = RegionId>>(
157 &self,
158 region_ids: I,
159 ) -> HashMap<RegionId, LeaderRegion> {
160 let inner = self.inner.read().unwrap();
161 region_ids
162 .into_iter()
163 .flat_map(|region_id| {
164 inner
165 .get(®ion_id)
166 .map(|leader_region| (region_id, *leader_region))
167 })
168 .collect::<HashMap<_, _>>()
169 }
170
171 pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
173 let mut inner = self.inner.write().unwrap();
174 for (region_id, leader_region) in key_values {
175 match inner.entry(region_id) {
176 Entry::Vacant(entry) => {
177 entry.insert(leader_region);
178 }
179 Entry::Occupied(mut entry) => {
180 let manifest_version = entry.get().manifest.manifest_version();
181 if manifest_version > leader_region.manifest.manifest_version() {
182 warn!(
183 "Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
184 region_id,
185 manifest_version,
186 leader_region.manifest.manifest_version()
187 );
188 } else {
189 entry.insert(leader_region);
190 }
191 }
192 }
193 }
194 }
195
196 pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
197 let mut inner = self.inner.write().unwrap();
198 for region_id in region_ids {
199 inner.remove(®ion_id);
200 }
201 }
202
203 pub fn reset(&self) {
205 let mut inner = self.inner.write().unwrap();
206 inner.clear();
207 }
208}