1use std::collections::{HashMap, HashSet};
27use std::time::Instant;
28
29use common_meta::key::table_repart::TableRepartValue;
30use common_meta::peer::Peer;
31use common_telemetry::{debug, warn};
32use store_api::storage::RegionId;
33use table::metadata::TableId;
34
35use crate::error::Result;
36use crate::gc::Region2Peers;
37use crate::gc::ctx::SchedulerCtx;
38use crate::gc::options::GcSchedulerOptions;
39use crate::gc::tracker::RegionGcTracker;
40
41#[derive(Debug, Clone)]
43pub(crate) struct DroppedRegionInfo {
44 pub region_id: RegionId,
45 pub table_id: TableId,
46 #[allow(unused)]
48 pub dst_regions: HashSet<RegionId>,
49}
50
51#[derive(Debug, Default)]
53pub(crate) struct DroppedRegionAssignment {
54 pub regions_by_peer: HashMap<Peer, Vec<DroppedRegionInfo>>,
56 pub force_full_listing: HashMap<Peer, HashSet<RegionId>>,
58 pub region_routes_override: HashMap<Peer, Region2Peers>,
60}
61
62pub(crate) struct DroppedRegionCollector<'a> {
64 ctx: &'a dyn SchedulerCtx,
65 config: &'a GcSchedulerOptions,
66 tracker: &'a tokio::sync::Mutex<RegionGcTracker>,
67}
68
69impl<'a> DroppedRegionCollector<'a> {
70 pub fn new(
71 ctx: &'a dyn SchedulerCtx,
72 config: &'a GcSchedulerOptions,
73 tracker: &'a tokio::sync::Mutex<RegionGcTracker>,
74 ) -> Self {
75 Self {
76 ctx,
77 config,
78 tracker,
79 }
80 }
81
82 pub async fn collect_and_assign(
89 &self,
90 table_reparts: &[(TableId, TableRepartValue)],
91 ) -> Result<DroppedRegionAssignment> {
92 let active_region_ids: HashSet<RegionId> = {
94 let table_ids = table_reparts
95 .iter()
96 .map(|(table_id, _)| *table_id)
97 .collect::<Vec<_>>();
98 let mut active_region_ids = HashSet::new();
99
100 let table_routes = self.ctx.batch_get_table_route(&table_ids).await?;
101 for table_route in table_routes.values() {
102 for region in &table_route.region_routes {
103 active_region_ids.insert(region.region.id);
104 }
105 }
106
107 active_region_ids
108 };
109
110 let dropped_regions = self.identify_dropped_regions(table_reparts, &active_region_ids);
111
112 if dropped_regions.is_empty() {
113 return Ok(DroppedRegionAssignment::default());
114 }
115
116 let dropped_regions = self.filter_by_cooldown(dropped_regions).await;
117
118 if dropped_regions.is_empty() {
119 return Ok(DroppedRegionAssignment::default());
120 }
121
122 self.assign_to_peers(dropped_regions).await
123 }
124
125 fn identify_dropped_regions(
128 &self,
129 table_reparts: &[(TableId, TableRepartValue)],
130 active_region_ids: &HashSet<RegionId>,
131 ) -> HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> {
132 let mut dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
133 HashMap::new();
134
135 for (table_id, repart) in table_reparts {
136 if repart.src_to_dst.is_empty() {
137 continue;
138 }
139
140 let entry = dropped_regions.entry(*table_id).or_default();
141 for (src_region, dst_regions) in repart.clone().src_to_dst {
142 if !active_region_ids.contains(&src_region) {
143 entry.insert(src_region, dst_regions.into_iter().collect());
144 }
145 }
146 }
147
148 dropped_regions.retain(|_, regions| !regions.is_empty());
149 dropped_regions
150 }
151
152 async fn filter_by_cooldown(
154 &self,
155 dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>>,
156 ) -> HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> {
157 let now = Instant::now();
158 let tracker = self.tracker.lock().await;
159 let mut filtered = HashMap::new();
160
161 for (table_id, regions) in dropped_regions {
162 let mut kept = HashMap::new();
163 for (region_id, dst_regions) in regions {
164 if let Some(gc_info) = tracker.get(®ion_id) {
165 let elapsed = now.saturating_duration_since(gc_info.last_gc_time);
166 if elapsed < self.config.gc_cooldown_period {
167 debug!("Skipping dropped region {} due to cooldown", region_id);
168 continue;
169 }
170 }
171 kept.insert(region_id, dst_regions);
172 }
173
174 if !kept.is_empty() {
175 filtered.insert(table_id, kept);
176 }
177 }
178
179 filtered
180 }
181
182 async fn assign_to_peers(
189 &self,
190 dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>>,
191 ) -> Result<DroppedRegionAssignment> {
192 let mut assignment = DroppedRegionAssignment::default();
193
194 for (table_id, regions) in dropped_regions {
195 let (phy_table_id, table_route) = match self.ctx.get_table_route(table_id).await {
196 Ok(route) => route,
197 Err(e) => {
198 warn!(
199 "Failed to get table route for table {}: {}, skipping dropped regions",
200 table_id, e
201 );
202 continue;
203 }
204 };
205
206 if phy_table_id != table_id {
207 continue;
208 }
209
210 let active_region_ids: HashSet<RegionId> = table_route
211 .region_routes
212 .iter()
213 .map(|r| r.region.id)
214 .collect();
215
216 let mut leader_peers: Vec<Peer> = table_route
217 .region_routes
218 .iter()
219 .filter_map(|r| r.leader_peer.clone())
220 .collect();
221 leader_peers.sort_by_key(|peer| peer.id);
222 leader_peers.dedup_by_key(|peer| peer.id);
223
224 if leader_peers.is_empty() {
225 warn!(
226 "No leader peers found for table {}, skipping dropped regions",
227 table_id
228 );
229 continue;
230 }
231
232 for (region_id, dst_regions) in regions {
233 if active_region_ids.contains(®ion_id) {
234 debug!(
235 "Skipping dropped region {} since it still exists in table route",
236 region_id
237 );
238 continue;
239 }
240
241 let selected_idx = (region_id.as_u64() as usize) % leader_peers.len();
242 let peer = leader_peers[selected_idx].clone();
243
244 let info = DroppedRegionInfo {
245 region_id,
246 table_id,
247 dst_regions,
248 };
249
250 assignment
251 .regions_by_peer
252 .entry(peer.clone())
253 .or_default()
254 .push(info);
255
256 assignment
257 .force_full_listing
258 .entry(peer.clone())
259 .or_default()
260 .insert(region_id);
261
262 assignment
263 .region_routes_override
264 .entry(peer.clone())
265 .or_default()
266 .insert(region_id, (peer, Vec::new()));
267 }
268 }
269
270 Ok(assignment)
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277
278 #[test]
279 fn test_dropped_region_info() {
280 let info = DroppedRegionInfo {
281 region_id: RegionId::new(1, 1),
282 table_id: 1,
283 dst_regions: HashSet::new(),
284 };
285 assert_eq!(info.region_id, RegionId::new(1, 1));
286 assert_eq!(info.table_id, 1);
287 }
288}