common_meta/
region_registry.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19use common_telemetry::warn;
20use store_api::storage::RegionId;
21
22use crate::datanode::{RegionManifestInfo, RegionStat};
23
24/// Represents information about a leader region in the cluster.
25/// Contains the datanode id where the leader is located,
26/// and the current manifest version.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub struct LeaderRegion {
29    pub datanode_id: u64,
30    pub manifest: LeaderRegionManifestInfo,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum LeaderRegionManifestInfo {
35    Mito {
36        manifest_version: u64,
37        flushed_entry_id: u64,
38        topic_latest_entry_id: u64,
39    },
40    Metric {
41        data_manifest_version: u64,
42        data_flushed_entry_id: u64,
43        data_topic_latest_entry_id: u64,
44        metadata_manifest_version: u64,
45        metadata_flushed_entry_id: u64,
46        metadata_topic_latest_entry_id: u64,
47    },
48}
49
50impl LeaderRegionManifestInfo {
51    /// Generate a [LeaderRegionManifestInfo] from [RegionStat].
52    pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo {
53        match region_stat.region_manifest {
54            RegionManifestInfo::Metric {
55                data_manifest_version,
56                data_flushed_entry_id,
57                metadata_manifest_version,
58                metadata_flushed_entry_id,
59            } => LeaderRegionManifestInfo::Metric {
60                data_manifest_version,
61                data_flushed_entry_id,
62                data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
63                metadata_manifest_version,
64                metadata_flushed_entry_id,
65                metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
66            },
67            RegionManifestInfo::Mito {
68                manifest_version,
69                flushed_entry_id,
70            } => LeaderRegionManifestInfo::Mito {
71                manifest_version,
72                flushed_entry_id,
73                topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
74            },
75        }
76    }
77
78    /// Returns the manifest version of the leader region.
79    pub fn manifest_version(&self) -> u64 {
80        match self {
81            LeaderRegionManifestInfo::Mito {
82                manifest_version, ..
83            } => *manifest_version,
84            LeaderRegionManifestInfo::Metric {
85                data_manifest_version,
86                ..
87            } => *data_manifest_version,
88        }
89    }
90
91    /// Returns the flushed entry id of the leader region.
92    pub fn flushed_entry_id(&self) -> u64 {
93        match self {
94            LeaderRegionManifestInfo::Mito {
95                flushed_entry_id, ..
96            } => *flushed_entry_id,
97            LeaderRegionManifestInfo::Metric {
98                data_flushed_entry_id,
99                ..
100            } => *data_flushed_entry_id,
101        }
102    }
103
104    /// Returns prunable entry id of the leader region.
105    /// It is used to determine the entry id that can be pruned in remote wal.
106    ///
107    /// For a mito region, the prunable entry id should max(flushed_entry_id, latest_entry_id_since_flush).
108    ///
109    /// For a metric region, the prunable entry id should min(
110    ///     max(data_flushed_entry_id, data_latest_entry_id_since_flush),
111    ///     max(metadata_flushed_entry_id, metadata_latest_entry_id_since_flush)
112    /// ).
113    pub fn prunable_entry_id(&self) -> u64 {
114        match self {
115            LeaderRegionManifestInfo::Mito {
116                flushed_entry_id,
117                topic_latest_entry_id,
118                ..
119            } => (*flushed_entry_id).max(*topic_latest_entry_id),
120            LeaderRegionManifestInfo::Metric {
121                data_flushed_entry_id,
122                data_topic_latest_entry_id,
123                metadata_flushed_entry_id,
124                metadata_topic_latest_entry_id,
125                ..
126            } => {
127                let data_prunable_entry_id =
128                    (*data_flushed_entry_id).max(*data_topic_latest_entry_id);
129                let metadata_prunable_entry_id =
130                    (*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id);
131                data_prunable_entry_id.min(metadata_prunable_entry_id)
132            }
133        }
134    }
135}
136
137pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
138
139/// Registry that maintains a mapping of all leader regions in the cluster.
140/// Tracks which datanode is hosting the leader for each region and the corresponding
141/// manifest version.
142#[derive(Default)]
143pub struct LeaderRegionRegistry {
144    inner: RwLock<HashMap<RegionId, LeaderRegion>>,
145}
146
147impl LeaderRegionRegistry {
148    /// Creates a new empty leader region registry.
149    pub fn new() -> Self {
150        Self {
151            inner: RwLock::new(HashMap::new()),
152        }
153    }
154
155    /// Gets the leader region for the given region ids.
156    pub fn batch_get<I: Iterator<Item = RegionId>>(
157        &self,
158        region_ids: I,
159    ) -> HashMap<RegionId, LeaderRegion> {
160        let inner = self.inner.read().unwrap();
161        region_ids
162            .into_iter()
163            .flat_map(|region_id| {
164                inner
165                    .get(&region_id)
166                    .map(|leader_region| (region_id, *leader_region))
167            })
168            .collect::<HashMap<_, _>>()
169    }
170
171    /// Puts the leader regions into the registry.
172    pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
173        let mut inner = self.inner.write().unwrap();
174        for (region_id, leader_region) in key_values {
175            match inner.entry(region_id) {
176                Entry::Vacant(entry) => {
177                    entry.insert(leader_region);
178                }
179                Entry::Occupied(mut entry) => {
180                    let manifest_version = entry.get().manifest.manifest_version();
181                    if manifest_version > leader_region.manifest.manifest_version() {
182                        warn!(
183                            "Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
184                            region_id,
185                            manifest_version,
186                            leader_region.manifest.manifest_version()
187                        );
188                    } else {
189                        entry.insert(leader_region);
190                    }
191                }
192            }
193        }
194    }
195
196    pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
197        let mut inner = self.inner.write().unwrap();
198        for region_id in region_ids {
199            inner.remove(&region_id);
200        }
201    }
202
203    /// Resets the registry to an empty state.
204    pub fn reset(&self) {
205        let mut inner = self.inner.write().unwrap();
206        inner.clear();
207    }
208}