common_meta/
region_registry.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::{Arc, RwLock};
18
19use common_telemetry::warn;
20use store_api::storage::RegionId;
21
22use crate::datanode::{RegionManifestInfo, RegionStat};
23
24/// Represents information about a leader region in the cluster.
25/// Contains the datanode id where the leader is located,
26/// and the current manifest version.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub struct LeaderRegion {
29    pub datanode_id: u64,
30    pub manifest: LeaderRegionManifestInfo,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum LeaderRegionManifestInfo {
35    Mito {
36        manifest_version: u64,
37        flushed_entry_id: u64,
38        topic_latest_entry_id: u64,
39    },
40    Metric {
41        data_manifest_version: u64,
42        data_flushed_entry_id: u64,
43        data_topic_latest_entry_id: u64,
44        metadata_manifest_version: u64,
45        metadata_flushed_entry_id: u64,
46        metadata_topic_latest_entry_id: u64,
47    },
48}
49
50impl LeaderRegionManifestInfo {
51    /// Generate a [LeaderRegionManifestInfo] from [RegionStat].
52    pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo {
53        match region_stat.region_manifest {
54            RegionManifestInfo::Metric {
55                data_manifest_version,
56                data_flushed_entry_id,
57                metadata_manifest_version,
58                metadata_flushed_entry_id,
59            } => LeaderRegionManifestInfo::Metric {
60                data_manifest_version,
61                data_flushed_entry_id,
62                data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
63                metadata_manifest_version,
64                metadata_flushed_entry_id,
65                metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
66            },
67            RegionManifestInfo::Mito {
68                manifest_version,
69                flushed_entry_id,
70                file_removed_cnt: _,
71            } => LeaderRegionManifestInfo::Mito {
72                manifest_version,
73                flushed_entry_id,
74                topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
75            },
76        }
77    }
78
79    /// Returns the manifest version of the leader region.
80    pub fn manifest_version(&self) -> u64 {
81        match self {
82            LeaderRegionManifestInfo::Mito {
83                manifest_version, ..
84            } => *manifest_version,
85            LeaderRegionManifestInfo::Metric {
86                data_manifest_version,
87                ..
88            } => *data_manifest_version,
89        }
90    }
91
92    /// Returns the flushed entry id of the leader region.
93    pub fn flushed_entry_id(&self) -> u64 {
94        match self {
95            LeaderRegionManifestInfo::Mito {
96                flushed_entry_id, ..
97            } => *flushed_entry_id,
98            LeaderRegionManifestInfo::Metric {
99                data_flushed_entry_id,
100                ..
101            } => *data_flushed_entry_id,
102        }
103    }
104
105    /// Returns prunable entry id of the leader region.
106    /// It is used to determine the entry id that can be pruned in remote wal.
107    ///
108    /// For a mito region, the prunable entry id should max(flushed_entry_id, latest_entry_id_since_flush).
109    ///
110    /// For a metric region, the prunable entry id should min(
111    ///     max(data_flushed_entry_id, data_latest_entry_id_since_flush),
112    ///     max(metadata_flushed_entry_id, metadata_latest_entry_id_since_flush)
113    /// ).
114    pub fn prunable_entry_id(&self) -> u64 {
115        match self {
116            LeaderRegionManifestInfo::Mito {
117                flushed_entry_id,
118                topic_latest_entry_id,
119                ..
120            } => (*flushed_entry_id).max(*topic_latest_entry_id),
121            LeaderRegionManifestInfo::Metric {
122                data_flushed_entry_id,
123                data_topic_latest_entry_id,
124                metadata_flushed_entry_id,
125                metadata_topic_latest_entry_id,
126                ..
127            } => {
128                let data_prunable_entry_id =
129                    (*data_flushed_entry_id).max(*data_topic_latest_entry_id);
130                let metadata_prunable_entry_id =
131                    (*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id);
132                data_prunable_entry_id.min(metadata_prunable_entry_id)
133            }
134        }
135    }
136
137    /// Returns the replay entry id of the data region.
138    pub fn replay_entry_id(&self) -> u64 {
139        match self {
140            LeaderRegionManifestInfo::Mito {
141                flushed_entry_id,
142                topic_latest_entry_id,
143                ..
144            } => (*flushed_entry_id).max(*topic_latest_entry_id),
145            LeaderRegionManifestInfo::Metric {
146                data_flushed_entry_id,
147                data_topic_latest_entry_id,
148                ..
149            } => (*data_flushed_entry_id).max(*data_topic_latest_entry_id),
150        }
151    }
152
153    /// Returns the replay entry id of the metadata region.
154    pub fn metadata_replay_entry_id(&self) -> Option<u64> {
155        match self {
156            LeaderRegionManifestInfo::Metric {
157                metadata_flushed_entry_id,
158                metadata_topic_latest_entry_id,
159                ..
160            } => Some((*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id)),
161            _ => None,
162        }
163    }
164
165    /// A region is considered inactive if the flushed entry id is less than the topic's latest entry id.
166    ///
167    /// The `topic_latest_entry_id` of a region is updated only when its memtable is empty during a flush.
168    /// This means that within the range `[flushed_entry_id, topic_latest_entry_id]`,
169    /// there is no data written to the memtable.
170    /// Therefore, such a region can be considered inactive.
171    pub fn is_inactive(&self) -> bool {
172        match *self {
173            LeaderRegionManifestInfo::Mito {
174                flushed_entry_id,
175                topic_latest_entry_id,
176                ..
177            } => flushed_entry_id < topic_latest_entry_id,
178            LeaderRegionManifestInfo::Metric {
179                data_flushed_entry_id,
180                data_topic_latest_entry_id,
181                metadata_flushed_entry_id,
182                metadata_topic_latest_entry_id,
183                ..
184            } => {
185                data_flushed_entry_id < data_topic_latest_entry_id
186                    || metadata_flushed_entry_id < metadata_topic_latest_entry_id
187            }
188        }
189    }
190}
191
192pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
193
194/// Registry that maintains a mapping of all leader regions in the cluster.
195/// Tracks which datanode is hosting the leader for each region and the corresponding
196/// manifest version.
197#[derive(Default)]
198pub struct LeaderRegionRegistry {
199    inner: RwLock<HashMap<RegionId, LeaderRegion>>,
200}
201
202impl LeaderRegionRegistry {
203    /// Creates a new empty leader region registry.
204    pub fn new() -> Self {
205        Self {
206            inner: RwLock::new(HashMap::new()),
207        }
208    }
209
210    /// Gets the leader region for the given region ids.
211    pub fn batch_get<I: Iterator<Item = RegionId>>(
212        &self,
213        region_ids: I,
214    ) -> HashMap<RegionId, LeaderRegion> {
215        let inner = self.inner.read().unwrap();
216        region_ids
217            .into_iter()
218            .flat_map(|region_id| {
219                inner
220                    .get(&region_id)
221                    .map(|leader_region| (region_id, *leader_region))
222            })
223            .collect::<HashMap<_, _>>()
224    }
225
226    /// Puts the leader regions into the registry.
227    pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
228        let mut inner = self.inner.write().unwrap();
229        for (region_id, leader_region) in key_values {
230            match inner.entry(region_id) {
231                Entry::Vacant(entry) => {
232                    entry.insert(leader_region);
233                }
234                Entry::Occupied(mut entry) => {
235                    let manifest_version = entry.get().manifest.manifest_version();
236                    if manifest_version > leader_region.manifest.manifest_version() {
237                        warn!(
238                            "Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
239                            region_id,
240                            manifest_version,
241                            leader_region.manifest.manifest_version()
242                        );
243                    } else {
244                        entry.insert(leader_region);
245                    }
246                }
247            }
248        }
249    }
250
251    pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
252        let mut inner = self.inner.write().unwrap();
253        for region_id in region_ids {
254            inner.remove(&region_id);
255        }
256    }
257
258    /// Resets the registry to an empty state.
259    pub fn reset(&self) {
260        let mut inner = self.inner.write().unwrap();
261        inner.clear();
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268
269    fn mito_manifest_info(
270        flushed_entry_id: u64,
271        topic_latest_entry_id: u64,
272    ) -> LeaderRegionManifestInfo {
273        LeaderRegionManifestInfo::Mito {
274            flushed_entry_id,
275            topic_latest_entry_id,
276            manifest_version: 1,
277        }
278    }
279
280    fn metric_manifest_info(
281        data_flushed_entry_id: u64,
282        data_topic_latest_entry_id: u64,
283        metadata_flushed_entry_id: u64,
284        metadata_topic_latest_entry_id: u64,
285    ) -> LeaderRegionManifestInfo {
286        LeaderRegionManifestInfo::Metric {
287            data_flushed_entry_id,
288            data_topic_latest_entry_id,
289            metadata_flushed_entry_id,
290            metadata_topic_latest_entry_id,
291            data_manifest_version: 1,
292            metadata_manifest_version: 1,
293        }
294    }
295
296    #[test]
297    fn test_is_inactive_mito() {
298        // inactive: flushed_entry_id < topic_latest_entry_id
299        let info = mito_manifest_info(10, 20);
300        assert!(info.is_inactive());
301        // active: flushed_entry_id == topic_latest_entry_id
302        let info = mito_manifest_info(20, 20);
303        assert!(!info.is_inactive());
304        // active: flushed_entry_id > topic_latest_entry_id
305        let info = mito_manifest_info(30, 20);
306        assert!(!info.is_inactive());
307    }
308
309    #[test]
310    fn test_is_inactive_metric() {
311        // inactive: data_flushed_entry_id < data_topic_latest_entry_id
312        let info = metric_manifest_info(5, 10, 20, 20);
313        assert!(info.is_inactive());
314        // inactive: metadata_flushed_entry_id < metadata_topic_latest_entry_id
315        let info = metric_manifest_info(10, 10, 15, 20);
316        assert!(info.is_inactive());
317        // inactive: both are less
318        let info = metric_manifest_info(1, 2, 3, 4);
319        assert!(info.is_inactive());
320        // active: both are equal
321        let info = metric_manifest_info(10, 10, 20, 20);
322        assert!(!info.is_inactive());
323        // active: both are greater
324        let info = metric_manifest_info(30, 20, 40, 20);
325        assert!(!info.is_inactive());
326    }
327
328    #[test]
329    fn test_prunable_entry_id_mito() {
330        let info = mito_manifest_info(100, 120);
331        // max(100, 120) = 120
332        assert_eq!(info.prunable_entry_id(), 120);
333
334        let info = mito_manifest_info(150, 120);
335        // max(150, 120) = 150
336        assert_eq!(info.prunable_entry_id(), 150);
337
338        let info = mito_manifest_info(0, 0);
339        assert_eq!(info.prunable_entry_id(), 0);
340    }
341
342    #[test]
343    fn test_prunable_entry_id_metric() {
344        let info = metric_manifest_info(100, 120, 90, 110);
345        // data_prunable = max(100,120)=120
346        // metadata_prunable = max(90,110)=110
347        // min(120,110)=110
348        assert_eq!(info.prunable_entry_id(), 110);
349        let info = metric_manifest_info(200, 150, 180, 220);
350        // data_prunable = max(200,150)=200
351        // metadata_prunable = max(180,220)=220
352        // min(200,220)=200
353        assert_eq!(info.prunable_entry_id(), 200);
354        let info = metric_manifest_info(0, 0, 0, 0);
355        assert_eq!(info.prunable_entry_id(), 0);
356    }
357}