common_meta/
region_registry.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19use common_telemetry::warn;
20use store_api::storage::RegionId;
21
22use crate::datanode::{RegionManifestInfo, RegionStat};
23
24/// Represents information about a leader region in the cluster.
25/// Contains the datanode id where the leader is located,
26/// and the current manifest version.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub struct LeaderRegion {
29    pub datanode_id: u64,
30    pub manifest: LeaderRegionManifestInfo,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum LeaderRegionManifestInfo {
35    Mito {
36        manifest_version: u64,
37        flushed_entry_id: u64,
38        topic_latest_entry_id: u64,
39    },
40    Metric {
41        data_manifest_version: u64,
42        data_flushed_entry_id: u64,
43        data_topic_latest_entry_id: u64,
44        metadata_manifest_version: u64,
45        metadata_flushed_entry_id: u64,
46        metadata_topic_latest_entry_id: u64,
47    },
48}
49
50impl LeaderRegionManifestInfo {
51    /// Generate a [LeaderRegionManifestInfo] from [RegionStat].
52    pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo {
53        match region_stat.region_manifest {
54            RegionManifestInfo::Metric {
55                data_manifest_version,
56                data_flushed_entry_id,
57                metadata_manifest_version,
58                metadata_flushed_entry_id,
59            } => LeaderRegionManifestInfo::Metric {
60                data_manifest_version,
61                data_flushed_entry_id,
62                data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
63                metadata_manifest_version,
64                metadata_flushed_entry_id,
65                metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
66            },
67            RegionManifestInfo::Mito {
68                manifest_version,
69                flushed_entry_id,
70            } => LeaderRegionManifestInfo::Mito {
71                manifest_version,
72                flushed_entry_id,
73                topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
74            },
75        }
76    }
77
78    /// Returns the manifest version of the leader region.
79    pub fn manifest_version(&self) -> u64 {
80        match self {
81            LeaderRegionManifestInfo::Mito {
82                manifest_version, ..
83            } => *manifest_version,
84            LeaderRegionManifestInfo::Metric {
85                data_manifest_version,
86                ..
87            } => *data_manifest_version,
88        }
89    }
90
91    /// Returns the flushed entry id of the leader region.
92    pub fn flushed_entry_id(&self) -> u64 {
93        match self {
94            LeaderRegionManifestInfo::Mito {
95                flushed_entry_id, ..
96            } => *flushed_entry_id,
97            LeaderRegionManifestInfo::Metric {
98                data_flushed_entry_id,
99                ..
100            } => *data_flushed_entry_id,
101        }
102    }
103
104    /// Returns prunable entry id of the leader region.
105    /// It is used to determine the entry id that can be pruned in remote wal.
106    ///
107    /// For a mito region, the prunable entry id should max(flushed_entry_id, latest_entry_id_since_flush).
108    ///
109    /// For a metric region, the prunable entry id should min(
110    ///     max(data_flushed_entry_id, data_latest_entry_id_since_flush),
111    ///     max(metadata_flushed_entry_id, metadata_latest_entry_id_since_flush)
112    /// ).
113    pub fn prunable_entry_id(&self) -> u64 {
114        match self {
115            LeaderRegionManifestInfo::Mito {
116                flushed_entry_id,
117                topic_latest_entry_id,
118                ..
119            } => (*flushed_entry_id).max(*topic_latest_entry_id),
120            LeaderRegionManifestInfo::Metric {
121                data_flushed_entry_id,
122                data_topic_latest_entry_id,
123                metadata_flushed_entry_id,
124                metadata_topic_latest_entry_id,
125                ..
126            } => {
127                let data_prunable_entry_id =
128                    (*data_flushed_entry_id).max(*data_topic_latest_entry_id);
129                let metadata_prunable_entry_id =
130                    (*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id);
131                data_prunable_entry_id.min(metadata_prunable_entry_id)
132            }
133        }
134    }
135
136    /// A region is considered inactive if the flushed entry id is less than the topic's latest entry id.
137    ///
138    /// The `topic_latest_entry_id` of a region is updated only when its memtable is empty during a flush.
139    /// This means that within the range `[flushed_entry_id, topic_latest_entry_id]`,
140    /// there is no data written to the memtable.
141    /// Therefore, such a region can be considered inactive.
142    pub fn is_inactive(&self) -> bool {
143        match *self {
144            LeaderRegionManifestInfo::Mito {
145                flushed_entry_id,
146                topic_latest_entry_id,
147                ..
148            } => flushed_entry_id < topic_latest_entry_id,
149            LeaderRegionManifestInfo::Metric {
150                data_flushed_entry_id,
151                data_topic_latest_entry_id,
152                metadata_flushed_entry_id,
153                metadata_topic_latest_entry_id,
154                ..
155            } => {
156                data_flushed_entry_id < data_topic_latest_entry_id
157                    || metadata_flushed_entry_id < metadata_topic_latest_entry_id
158            }
159        }
160    }
161}
162
163pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
164
165/// Registry that maintains a mapping of all leader regions in the cluster.
166/// Tracks which datanode is hosting the leader for each region and the corresponding
167/// manifest version.
168#[derive(Default)]
169pub struct LeaderRegionRegistry {
170    inner: RwLock<HashMap<RegionId, LeaderRegion>>,
171}
172
173impl LeaderRegionRegistry {
174    /// Creates a new empty leader region registry.
175    pub fn new() -> Self {
176        Self {
177            inner: RwLock::new(HashMap::new()),
178        }
179    }
180
181    /// Gets the leader region for the given region ids.
182    pub fn batch_get<I: Iterator<Item = RegionId>>(
183        &self,
184        region_ids: I,
185    ) -> HashMap<RegionId, LeaderRegion> {
186        let inner = self.inner.read().unwrap();
187        region_ids
188            .into_iter()
189            .flat_map(|region_id| {
190                inner
191                    .get(&region_id)
192                    .map(|leader_region| (region_id, *leader_region))
193            })
194            .collect::<HashMap<_, _>>()
195    }
196
197    /// Puts the leader regions into the registry.
198    pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
199        let mut inner = self.inner.write().unwrap();
200        for (region_id, leader_region) in key_values {
201            match inner.entry(region_id) {
202                Entry::Vacant(entry) => {
203                    entry.insert(leader_region);
204                }
205                Entry::Occupied(mut entry) => {
206                    let manifest_version = entry.get().manifest.manifest_version();
207                    if manifest_version > leader_region.manifest.manifest_version() {
208                        warn!(
209                            "Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
210                            region_id,
211                            manifest_version,
212                            leader_region.manifest.manifest_version()
213                        );
214                    } else {
215                        entry.insert(leader_region);
216                    }
217                }
218            }
219        }
220    }
221
222    pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
223        let mut inner = self.inner.write().unwrap();
224        for region_id in region_ids {
225            inner.remove(&region_id);
226        }
227    }
228
229    /// Resets the registry to an empty state.
230    pub fn reset(&self) {
231        let mut inner = self.inner.write().unwrap();
232        inner.clear();
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239
240    fn mito_manifest_info(
241        flushed_entry_id: u64,
242        topic_latest_entry_id: u64,
243    ) -> LeaderRegionManifestInfo {
244        LeaderRegionManifestInfo::Mito {
245            flushed_entry_id,
246            topic_latest_entry_id,
247            manifest_version: 1,
248        }
249    }
250
251    fn metric_manifest_info(
252        data_flushed_entry_id: u64,
253        data_topic_latest_entry_id: u64,
254        metadata_flushed_entry_id: u64,
255        metadata_topic_latest_entry_id: u64,
256    ) -> LeaderRegionManifestInfo {
257        LeaderRegionManifestInfo::Metric {
258            data_flushed_entry_id,
259            data_topic_latest_entry_id,
260            metadata_flushed_entry_id,
261            metadata_topic_latest_entry_id,
262            data_manifest_version: 1,
263            metadata_manifest_version: 1,
264        }
265    }
266
267    #[test]
268    fn test_is_inactive_mito() {
269        // inactive: flushed_entry_id < topic_latest_entry_id
270        let info = mito_manifest_info(10, 20);
271        assert!(info.is_inactive());
272        // active: flushed_entry_id == topic_latest_entry_id
273        let info = mito_manifest_info(20, 20);
274        assert!(!info.is_inactive());
275        // active: flushed_entry_id > topic_latest_entry_id
276        let info = mito_manifest_info(30, 20);
277        assert!(!info.is_inactive());
278    }
279
280    #[test]
281    fn test_is_inactive_metric() {
282        // inactive: data_flushed_entry_id < data_topic_latest_entry_id
283        let info = metric_manifest_info(5, 10, 20, 20);
284        assert!(info.is_inactive());
285        // inactive: metadata_flushed_entry_id < metadata_topic_latest_entry_id
286        let info = metric_manifest_info(10, 10, 15, 20);
287        assert!(info.is_inactive());
288        // inactive: both are less
289        let info = metric_manifest_info(1, 2, 3, 4);
290        assert!(info.is_inactive());
291        // active: both are equal
292        let info = metric_manifest_info(10, 10, 20, 20);
293        assert!(!info.is_inactive());
294        // active: both are greater
295        let info = metric_manifest_info(30, 20, 40, 20);
296        assert!(!info.is_inactive());
297    }
298
299    #[test]
300    fn test_prunable_entry_id_mito() {
301        let info = mito_manifest_info(100, 120);
302        // max(100, 120) = 120
303        assert_eq!(info.prunable_entry_id(), 120);
304
305        let info = mito_manifest_info(150, 120);
306        // max(150, 120) = 150
307        assert_eq!(info.prunable_entry_id(), 150);
308
309        let info = mito_manifest_info(0, 0);
310        assert_eq!(info.prunable_entry_id(), 0);
311    }
312
313    #[test]
314    fn test_prunable_entry_id_metric() {
315        let info = metric_manifest_info(100, 120, 90, 110);
316        // data_prunable = max(100,120)=120
317        // metadata_prunable = max(90,110)=110
318        // min(120,110)=110
319        assert_eq!(info.prunable_entry_id(), 110);
320        let info = metric_manifest_info(200, 150, 180, 220);
321        // data_prunable = max(200,150)=200
322        // metadata_prunable = max(180,220)=220
323        // min(200,220)=200
324        assert_eq!(info.prunable_entry_id(), 200);
325        let info = metric_manifest_info(0, 0, 0, 0);
326        assert_eq!(info.prunable_entry_id(), 0);
327    }
328}