Skip to main content

common_meta/
region_registry.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::{Arc, RwLock};
18
19use common_telemetry::{debug, warn};
20use store_api::storage::RegionId;
21
22use crate::datanode::{RegionManifestInfo, RegionStat};
23
24/// Represents information about a leader region in the cluster.
25/// Contains the datanode id where the leader is located,
26/// and the current manifest version.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub struct LeaderRegion {
29    pub datanode_id: u64,
30    pub manifest: LeaderRegionManifestInfo,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum LeaderRegionManifestInfo {
35    Mito {
36        manifest_version: u64,
37        flushed_entry_id: u64,
38        topic_latest_entry_id: u64,
39    },
40    Metric {
41        data_manifest_version: u64,
42        data_flushed_entry_id: u64,
43        data_topic_latest_entry_id: u64,
44        metadata_manifest_version: u64,
45        metadata_flushed_entry_id: u64,
46        metadata_topic_latest_entry_id: u64,
47    },
48}
49
50impl LeaderRegionManifestInfo {
51    /// Generate a [LeaderRegionManifestInfo] from [RegionStat].
52    pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo {
53        match region_stat.region_manifest {
54            RegionManifestInfo::Metric {
55                data_manifest_version,
56                data_flushed_entry_id,
57                metadata_manifest_version,
58                metadata_flushed_entry_id,
59            } => LeaderRegionManifestInfo::Metric {
60                data_manifest_version,
61                data_flushed_entry_id,
62                data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
63                metadata_manifest_version,
64                metadata_flushed_entry_id,
65                metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
66            },
67            RegionManifestInfo::Mito {
68                manifest_version,
69                flushed_entry_id,
70                file_removed_cnt: _,
71            } => LeaderRegionManifestInfo::Mito {
72                manifest_version,
73                flushed_entry_id,
74                topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
75            },
76        }
77    }
78
79    /// Returns the manifest version of the leader region.
80    pub fn manifest_version(&self) -> u64 {
81        match self {
82            LeaderRegionManifestInfo::Mito {
83                manifest_version, ..
84            } => *manifest_version,
85            LeaderRegionManifestInfo::Metric {
86                data_manifest_version,
87                ..
88            } => *data_manifest_version,
89        }
90    }
91
92    /// Returns the flushed entry id of the leader region.
93    pub fn flushed_entry_id(&self) -> u64 {
94        match self {
95            LeaderRegionManifestInfo::Mito {
96                flushed_entry_id, ..
97            } => *flushed_entry_id,
98            LeaderRegionManifestInfo::Metric {
99                data_flushed_entry_id,
100                ..
101            } => *data_flushed_entry_id,
102        }
103    }
104
105    /// Returns prunable entry id of the leader region.
106    /// It is used to determine the entry id that can be pruned in remote wal.
107    ///
108    /// For a mito region, the prunable entry id should max(flushed_entry_id, latest_entry_id_since_flush).
109    ///
110    /// For a metric region, the prunable entry id should min(
111    ///     max(data_flushed_entry_id, data_latest_entry_id_since_flush),
112    ///     max(metadata_flushed_entry_id, metadata_latest_entry_id_since_flush)
113    /// ).
114    pub fn prunable_entry_id(&self) -> u64 {
115        match self {
116            LeaderRegionManifestInfo::Mito {
117                flushed_entry_id,
118                topic_latest_entry_id,
119                ..
120            } => (*flushed_entry_id).max(*topic_latest_entry_id),
121            LeaderRegionManifestInfo::Metric {
122                data_flushed_entry_id,
123                data_topic_latest_entry_id,
124                metadata_flushed_entry_id,
125                metadata_topic_latest_entry_id,
126                ..
127            } => {
128                let data_prunable_entry_id =
129                    (*data_flushed_entry_id).max(*data_topic_latest_entry_id);
130                let metadata_prunable_entry_id =
131                    (*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id);
132                data_prunable_entry_id.min(metadata_prunable_entry_id)
133            }
134        }
135    }
136
137    /// Returns the replay entry id of the data region.
138    pub fn replay_entry_id(&self) -> u64 {
139        match self {
140            LeaderRegionManifestInfo::Mito {
141                flushed_entry_id,
142                topic_latest_entry_id,
143                ..
144            } => (*flushed_entry_id).max(*topic_latest_entry_id),
145            LeaderRegionManifestInfo::Metric {
146                data_flushed_entry_id,
147                data_topic_latest_entry_id,
148                ..
149            } => (*data_flushed_entry_id).max(*data_topic_latest_entry_id),
150        }
151    }
152
153    /// Returns the replay entry id of the metadata region.
154    pub fn metadata_replay_entry_id(&self) -> Option<u64> {
155        match self {
156            LeaderRegionManifestInfo::Metric {
157                metadata_flushed_entry_id,
158                metadata_topic_latest_entry_id,
159                ..
160            } => Some((*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id)),
161            _ => None,
162        }
163    }
164
165    /// A region is considered inactive if the flushed entry id is less than the topic's latest entry id.
166    ///
167    /// The `topic_latest_entry_id` of a region is updated only when its memtable is empty during a flush.
168    /// This means that within the range `[flushed_entry_id, topic_latest_entry_id]`,
169    /// there is no data written to the memtable.
170    /// Therefore, such a region can be considered inactive.
171    pub fn is_inactive(&self) -> bool {
172        match *self {
173            LeaderRegionManifestInfo::Mito {
174                flushed_entry_id,
175                topic_latest_entry_id,
176                ..
177            } => flushed_entry_id < topic_latest_entry_id,
178            LeaderRegionManifestInfo::Metric {
179                data_flushed_entry_id,
180                data_topic_latest_entry_id,
181                metadata_flushed_entry_id,
182                metadata_topic_latest_entry_id,
183                ..
184            } => {
185                data_flushed_entry_id < data_topic_latest_entry_id
186                    || metadata_flushed_entry_id < metadata_topic_latest_entry_id
187            }
188        }
189    }
190}
191
192pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
193
194/// Registry that maintains a mapping of all leader regions in the cluster.
195/// Tracks which datanode is hosting the leader for each region and the corresponding
196/// manifest version.
197#[derive(Default)]
198pub struct LeaderRegionRegistry {
199    inner: RwLock<HashMap<RegionId, LeaderRegion>>,
200}
201
202impl LeaderRegionRegistry {
203    /// Creates a new empty leader region registry.
204    pub fn new() -> Self {
205        Self {
206            inner: RwLock::new(HashMap::new()),
207        }
208    }
209
210    /// Gets the leader region for the given region ids.
211    pub fn batch_get<I: Iterator<Item = RegionId>>(
212        &self,
213        region_ids: I,
214    ) -> HashMap<RegionId, LeaderRegion> {
215        let inner = self.inner.read().unwrap();
216        region_ids
217            .into_iter()
218            .flat_map(|region_id| {
219                inner
220                    .get(&region_id)
221                    .map(|leader_region| (region_id, *leader_region))
222            })
223            .collect::<HashMap<_, _>>()
224    }
225
226    /// Puts the leader regions into the registry.
227    pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
228        let mut inner = self.inner.write().unwrap();
229        for (region_id, leader_region) in key_values {
230            match inner.entry(region_id) {
231                Entry::Vacant(entry) => {
232                    entry.insert(leader_region);
233                }
234                Entry::Occupied(mut entry) => {
235                    let manifest_version = entry.get().manifest.manifest_version();
236                    if manifest_version > leader_region.manifest.manifest_version() {
237                        warn!(
238                            "Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
239                            region_id,
240                            manifest_version,
241                            leader_region.manifest.manifest_version()
242                        );
243                    } else {
244                        debug!(
245                            "Updating leader region for region {}, pruned entry id: {}",
246                            region_id,
247                            leader_region.manifest.prunable_entry_id(),
248                        );
249                        entry.insert(leader_region);
250                    }
251                }
252            }
253        }
254    }
255
256    pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
257        let mut inner = self.inner.write().unwrap();
258        for region_id in region_ids {
259            inner.remove(&region_id);
260        }
261    }
262
263    /// Resets the registry to an empty state.
264    pub fn reset(&self) {
265        let mut inner = self.inner.write().unwrap();
266        inner.clear();
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    fn mito_manifest_info(
275        flushed_entry_id: u64,
276        topic_latest_entry_id: u64,
277    ) -> LeaderRegionManifestInfo {
278        LeaderRegionManifestInfo::Mito {
279            flushed_entry_id,
280            topic_latest_entry_id,
281            manifest_version: 1,
282        }
283    }
284
285    fn metric_manifest_info(
286        data_flushed_entry_id: u64,
287        data_topic_latest_entry_id: u64,
288        metadata_flushed_entry_id: u64,
289        metadata_topic_latest_entry_id: u64,
290    ) -> LeaderRegionManifestInfo {
291        LeaderRegionManifestInfo::Metric {
292            data_flushed_entry_id,
293            data_topic_latest_entry_id,
294            metadata_flushed_entry_id,
295            metadata_topic_latest_entry_id,
296            data_manifest_version: 1,
297            metadata_manifest_version: 1,
298        }
299    }
300
301    #[test]
302    fn test_is_inactive_mito() {
303        // inactive: flushed_entry_id < topic_latest_entry_id
304        let info = mito_manifest_info(10, 20);
305        assert!(info.is_inactive());
306        // active: flushed_entry_id == topic_latest_entry_id
307        let info = mito_manifest_info(20, 20);
308        assert!(!info.is_inactive());
309        // active: flushed_entry_id > topic_latest_entry_id
310        let info = mito_manifest_info(30, 20);
311        assert!(!info.is_inactive());
312    }
313
314    #[test]
315    fn test_is_inactive_metric() {
316        // inactive: data_flushed_entry_id < data_topic_latest_entry_id
317        let info = metric_manifest_info(5, 10, 20, 20);
318        assert!(info.is_inactive());
319        // inactive: metadata_flushed_entry_id < metadata_topic_latest_entry_id
320        let info = metric_manifest_info(10, 10, 15, 20);
321        assert!(info.is_inactive());
322        // inactive: both are less
323        let info = metric_manifest_info(1, 2, 3, 4);
324        assert!(info.is_inactive());
325        // active: both are equal
326        let info = metric_manifest_info(10, 10, 20, 20);
327        assert!(!info.is_inactive());
328        // active: both are greater
329        let info = metric_manifest_info(30, 20, 40, 20);
330        assert!(!info.is_inactive());
331    }
332
333    #[test]
334    fn test_prunable_entry_id_mito() {
335        let info = mito_manifest_info(100, 120);
336        // max(100, 120) = 120
337        assert_eq!(info.prunable_entry_id(), 120);
338
339        let info = mito_manifest_info(150, 120);
340        // max(150, 120) = 150
341        assert_eq!(info.prunable_entry_id(), 150);
342
343        let info = mito_manifest_info(0, 0);
344        assert_eq!(info.prunable_entry_id(), 0);
345    }
346
347    #[test]
348    fn test_prunable_entry_id_metric() {
349        let info = metric_manifest_info(100, 120, 90, 110);
350        // data_prunable = max(100,120)=120
351        // metadata_prunable = max(90,110)=110
352        // min(120,110)=110
353        assert_eq!(info.prunable_entry_id(), 110);
354        let info = metric_manifest_info(200, 150, 180, 220);
355        // data_prunable = max(200,150)=200
356        // metadata_prunable = max(180,220)=220
357        // min(200,220)=200
358        assert_eq!(info.prunable_entry_id(), 200);
359        let info = metric_manifest_info(0, 0, 0, 0);
360        assert_eq!(info.prunable_entry_id(), 0);
361    }
362}