1use std::collections::{HashMap, HashSet};
27use std::time::Instant;
28
29use common_meta::key::table_repart::TableRepartValue;
30use common_meta::peer::Peer;
31use common_telemetry::{debug, warn};
32use store_api::storage::RegionId;
33use table::metadata::TableId;
34
35use crate::error::Result;
36use crate::gc::Region2Peers;
37use crate::gc::ctx::SchedulerCtx;
38use crate::gc::options::GcSchedulerOptions;
39use crate::gc::tracker::RegionGcTracker;
40
41#[derive(Debug, Clone)]
43pub(crate) struct DroppedRegionInfo {
44 pub region_id: RegionId,
45 pub table_id: TableId,
46 #[allow(unused)]
48 pub dst_regions: HashSet<RegionId>,
49}
50
51#[derive(Debug, Default)]
53pub(crate) struct DroppedRegionAssignment {
54 pub regions_by_peer: HashMap<Peer, Vec<DroppedRegionInfo>>,
56 pub force_full_listing: HashMap<Peer, HashSet<RegionId>>,
58 pub region_routes_override: HashMap<Peer, Region2Peers>,
60}
61
62pub(crate) struct DroppedRegionCollector<'a> {
64 ctx: &'a dyn SchedulerCtx,
65 config: &'a GcSchedulerOptions,
66 tracker: &'a tokio::sync::Mutex<RegionGcTracker>,
67}
68
69impl<'a> DroppedRegionCollector<'a> {
70 pub fn new(
71 ctx: &'a dyn SchedulerCtx,
72 config: &'a GcSchedulerOptions,
73 tracker: &'a tokio::sync::Mutex<RegionGcTracker>,
74 ) -> Self {
75 Self {
76 ctx,
77 config,
78 tracker,
79 }
80 }
81
82 pub async fn collect_and_assign(
89 &self,
90 table_reparts: &[(TableId, TableRepartValue)],
91 ) -> Result<DroppedRegionAssignment> {
92 self.collect_and_assign_with_cooldown(table_reparts, true)
93 .await
94 }
95
96 pub async fn collect_and_assign_with_cooldown(
98 &self,
99 table_reparts: &[(TableId, TableRepartValue)],
100 apply_cooldown: bool,
101 ) -> Result<DroppedRegionAssignment> {
102 let active_region_ids: HashSet<RegionId> = {
104 let table_ids = table_reparts
105 .iter()
106 .map(|(table_id, _)| *table_id)
107 .collect::<Vec<_>>();
108 let mut active_region_ids = HashSet::new();
109
110 let table_routes = self.ctx.batch_get_table_route(&table_ids).await?;
111 for table_route in table_routes.values() {
112 for region in &table_route.region_routes {
113 active_region_ids.insert(region.region.id);
114 }
115 }
116
117 active_region_ids
118 };
119
120 let dropped_regions = self.identify_dropped_regions(table_reparts, &active_region_ids);
121
122 if dropped_regions.is_empty() {
123 return Ok(DroppedRegionAssignment::default());
124 }
125
126 let dropped_regions = if apply_cooldown {
127 self.filter_by_cooldown(dropped_regions).await
128 } else {
129 dropped_regions
130 };
131
132 if dropped_regions.is_empty() {
133 return Ok(DroppedRegionAssignment::default());
134 }
135
136 self.assign_to_peers(dropped_regions).await
137 }
138
139 fn identify_dropped_regions(
142 &self,
143 table_reparts: &[(TableId, TableRepartValue)],
144 active_region_ids: &HashSet<RegionId>,
145 ) -> HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> {
146 let mut dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
147 HashMap::new();
148
149 for (table_id, repart) in table_reparts {
150 if repart.src_to_dst.is_empty() {
151 continue;
152 }
153
154 let entry = dropped_regions.entry(*table_id).or_default();
155 for (src_region, dst_regions) in repart.clone().src_to_dst {
156 if !active_region_ids.contains(&src_region) {
157 entry.insert(src_region, dst_regions.into_iter().collect());
158 }
159 }
160 }
161
162 dropped_regions.retain(|_, regions| !regions.is_empty());
163 dropped_regions
164 }
165
166 async fn filter_by_cooldown(
168 &self,
169 dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>>,
170 ) -> HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> {
171 let now = Instant::now();
172 let tracker = self.tracker.lock().await;
173 let mut filtered = HashMap::new();
174
175 for (table_id, regions) in dropped_regions {
176 let mut kept = HashMap::new();
177 for (region_id, dst_regions) in regions {
178 if let Some(gc_info) = tracker.get(®ion_id) {
179 let elapsed = now.saturating_duration_since(gc_info.last_gc_time);
180 if elapsed < self.config.gc_cooldown_period {
181 debug!("Skipping dropped region {} due to cooldown", region_id);
182 continue;
183 }
184 }
185 kept.insert(region_id, dst_regions);
186 }
187
188 if !kept.is_empty() {
189 filtered.insert(table_id, kept);
190 }
191 }
192
193 filtered
194 }
195
196 async fn assign_to_peers(
203 &self,
204 dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>>,
205 ) -> Result<DroppedRegionAssignment> {
206 let mut assignment = DroppedRegionAssignment::default();
207
208 for (table_id, regions) in dropped_regions {
209 let (phy_table_id, table_route) = match self.ctx.get_table_route(table_id).await {
210 Ok(route) => route,
211 Err(e) => {
212 warn!(
213 "Failed to get table route for table {}: {}, skipping dropped regions",
214 table_id, e
215 );
216 continue;
217 }
218 };
219
220 if phy_table_id != table_id {
221 continue;
222 }
223
224 let active_region_ids: HashSet<RegionId> = table_route
225 .region_routes
226 .iter()
227 .map(|r| r.region.id)
228 .collect();
229
230 let mut leader_peers: Vec<Peer> = table_route
231 .region_routes
232 .iter()
233 .filter_map(|r| r.leader_peer.clone())
234 .collect();
235 leader_peers.sort_by_key(|peer| peer.id);
236 leader_peers.dedup_by_key(|peer| peer.id);
237
238 if leader_peers.is_empty() {
239 warn!(
240 "No leader peers found for table {}, skipping dropped regions",
241 table_id
242 );
243 continue;
244 }
245
246 for (region_id, dst_regions) in regions {
247 if active_region_ids.contains(®ion_id) {
248 debug!(
249 "Skipping dropped region {} since it still exists in table route",
250 region_id
251 );
252 continue;
253 }
254
255 let selected_idx = (region_id.as_u64() as usize) % leader_peers.len();
256 let peer = leader_peers[selected_idx].clone();
257
258 let info = DroppedRegionInfo {
259 region_id,
260 table_id,
261 dst_regions,
262 };
263
264 assignment
265 .regions_by_peer
266 .entry(peer.clone())
267 .or_default()
268 .push(info);
269
270 assignment
271 .force_full_listing
272 .entry(peer.clone())
273 .or_default()
274 .insert(region_id);
275
276 assignment
277 .region_routes_override
278 .entry(peer.clone())
279 .or_default()
280 .insert(region_id, (peer, Vec::new()));
281 }
282 }
283
284 Ok(assignment)
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn test_dropped_region_info() {
294 let info = DroppedRegionInfo {
295 region_id: RegionId::new(1, 1),
296 table_id: 1,
297 dst_regions: HashSet::new(),
298 };
299 assert_eq!(info.region_id, RegionId::new(1, 1));
300 assert_eq!(info.table_id, 1);
301 }
302}