1use std::collections::{HashMap, HashSet};
16use std::time::Instant;
17
18use common_meta::key::table_route::PhysicalTableRouteValue;
19use common_meta::peer::Peer;
20use common_telemetry::{debug, error, info, warn};
21use futures::StreamExt;
22use itertools::Itertools;
23use store_api::storage::{FileRefsManifest, GcReport, RegionId};
24use table::metadata::TableId;
25use tokio::time::sleep;
26
27use crate::error::Result;
28use crate::gc::candidate::GcCandidate;
29use crate::gc::scheduler::{GcJobReport, GcScheduler};
30use crate::gc::tracker::RegionGcInfo;
31use crate::region;
32
33pub(crate) type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;
34
35pub(crate) type Peer2Regions = HashMap<Peer, HashSet<RegionId>>;
36
37impl GcScheduler {
38 pub(crate) async fn trigger_gc(&self) -> Result<GcJobReport> {
41 let start_time = Instant::now();
42 info!("Starting GC cycle");
43
44 let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
46 info!(
47 "Fetched region stats for {} tables",
48 table_to_region_stats.len()
49 );
50
51 let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?;
53
54 if per_table_candidates.is_empty() {
55 info!("No GC candidates found, skipping GC cycle");
56 return Ok(Default::default());
57 }
58
59 let datanode_to_candidates = self
61 .aggregate_candidates_by_datanode(per_table_candidates)
62 .await?;
63
64 if datanode_to_candidates.is_empty() {
65 info!("No valid datanode candidates found, skipping GC cycle");
66 return Ok(Default::default());
67 }
68
69 let report = self
71 .parallel_process_datanodes(datanode_to_candidates)
72 .await;
73
74 let duration = start_time.elapsed();
75 info!(
76 "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
77 report.per_datanode_reports.len(), report.failed_datanodes.len(),
79 duration
80 );
81 debug!("Detailed GC Job Report: {report:#?}");
82
83 Ok(report)
84 }
85
86 pub(crate) async fn find_related_regions(
91 &self,
92 candidate_region_ids: &[RegionId],
93 ) -> Result<HashMap<RegionId, Vec<RegionId>>> {
94 Ok(candidate_region_ids.iter().map(|&r| (r, vec![r])).collect())
95 }
96
97 pub(crate) async fn aggregate_candidates_by_datanode(
99 &self,
100 per_table_candidates: HashMap<TableId, Vec<GcCandidate>>,
101 ) -> Result<HashMap<Peer, Vec<(TableId, GcCandidate)>>> {
102 let mut datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>> = HashMap::new();
103
104 for (table_id, candidates) in per_table_candidates {
105 if candidates.is_empty() {
106 continue;
107 }
108
109 let (phy_table_id, table_peer) = self.ctx.get_table_route(table_id).await?;
111
112 if phy_table_id != table_id {
113 continue;
115 }
116
117 let region_to_peer = table_peer
118 .region_routes
119 .iter()
120 .filter_map(|r| {
121 r.leader_peer
122 .as_ref()
123 .map(|peer| (r.region.id, peer.clone()))
124 })
125 .collect::<HashMap<RegionId, Peer>>();
126
127 for candidate in candidates {
128 if let Some(peer) = region_to_peer.get(&candidate.region_id) {
129 datanode_to_candidates
130 .entry(peer.clone())
131 .or_default()
132 .push((table_id, candidate));
133 } else {
134 warn!(
135 "Skipping region {} for table {}: no leader peer found",
136 candidate.region_id, table_id
137 );
138 }
139 }
140 }
141
142 info!(
143 "Aggregated GC candidates for {} datanodes",
144 datanode_to_candidates.len()
145 );
146 Ok(datanode_to_candidates)
147 }
148
149 pub(crate) async fn parallel_process_datanodes(
151 &self,
152 datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>>,
153 ) -> GcJobReport {
154 let mut report = GcJobReport::default();
155
156 let results: Vec<_> = futures::stream::iter(
158 datanode_to_candidates
159 .into_iter()
160 .filter(|(_, candidates)| !candidates.is_empty()),
161 )
162 .map(|(peer, candidates)| {
163 let scheduler = self;
164 let peer_clone = peer.clone();
165 async move {
166 (
167 peer,
168 scheduler.process_datanode_gc(peer_clone, candidates).await,
169 )
170 }
171 })
172 .buffer_unordered(self.config.max_concurrent_tables) .collect()
174 .await;
175
176 for (peer, result) in results {
178 match result {
179 Ok(dn_report) => {
180 report.per_datanode_reports.insert(peer.id, dn_report);
181 }
182 Err(e) => {
183 error!("Failed to process datanode GC for peer {}: {:#?}", peer, e);
184 report.failed_datanodes.entry(peer.id).or_default().push(e);
187 }
188 }
189 }
190
191 report
192 }
193
194 pub(crate) async fn process_datanode_gc(
197 &self,
198 peer: Peer,
199 candidates: Vec<(TableId, GcCandidate)>,
200 ) -> Result<GcReport> {
201 info!(
202 "Starting GC for datanode {} with {} candidate regions",
203 peer,
204 candidates.len()
205 );
206
207 if candidates.is_empty() {
208 return Ok(Default::default());
209 }
210
211 let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
212
213 let all_related_regions = self.find_related_regions(&all_region_ids).await?;
214
215 let (region_to_peer, _) = self
216 .discover_datanodes_for_regions(&all_related_regions.keys().cloned().collect_vec())
217 .await?;
218
219 let file_refs_manifest = self
221 .ctx
222 .get_file_references(
223 &all_region_ids,
224 all_related_regions,
225 ®ion_to_peer,
226 self.config.mailbox_timeout,
227 )
228 .await?;
229
230 let (gc_report, fully_listed_regions) = {
232 let mut batch_full_listing_decisions =
235 self.batch_should_use_full_listing(&all_region_ids).await;
236
237 let need_full_list_regions = batch_full_listing_decisions
238 .iter()
239 .filter_map(
240 |(®ion_id, &need_full)| {
241 if need_full { Some(region_id) } else { None }
242 },
243 )
244 .collect_vec();
245 let mut fast_list_regions = batch_full_listing_decisions
246 .iter()
247 .filter_map(
248 |(®ion_id, &need_full)| {
249 if !need_full { Some(region_id) } else { None }
250 },
251 )
252 .collect_vec();
253
254 let mut combined_report = GcReport::default();
255
256 if !fast_list_regions.is_empty() {
258 match self
259 .ctx
260 .gc_regions(
261 peer.clone(),
262 &fast_list_regions,
263 &file_refs_manifest,
264 false,
265 self.config.mailbox_timeout,
266 )
267 .await
268 {
269 Ok(report) => combined_report.merge(report),
270 Err(e) => {
271 error!(
272 "Failed to GC regions {:?} on datanode {}: {}",
273 fast_list_regions, peer, e
274 );
275
276 combined_report
278 .need_retry_regions
279 .extend(fast_list_regions.clone().into_iter());
280 }
281 }
282 }
283
284 if !need_full_list_regions.is_empty() {
285 match self
286 .ctx
287 .gc_regions(
288 peer.clone(),
289 &need_full_list_regions,
290 &file_refs_manifest,
291 true,
292 self.config.mailbox_timeout,
293 )
294 .await
295 {
296 Ok(report) => combined_report.merge(report),
297 Err(e) => {
298 error!(
299 "Failed to GC regions {:?} on datanode {}: {}",
300 need_full_list_regions, peer, e
301 );
302
303 combined_report
305 .need_retry_regions
306 .extend(need_full_list_regions.clone());
307 }
308 }
309 }
310 let fully_listed_regions = need_full_list_regions
311 .into_iter()
312 .filter(|r| !combined_report.need_retry_regions.contains(r))
313 .collect::<HashSet<_>>();
314
315 (combined_report, fully_listed_regions)
316 };
317
318 for region_id in &all_region_ids {
320 self.update_full_listing_time(*region_id, fully_listed_regions.contains(region_id))
321 .await;
322 }
323
324 info!(
325 "Completed GC for datanode {}: {} regions processed",
326 peer,
327 all_region_ids.len()
328 );
329
330 Ok(gc_report)
331 }
332
333 async fn discover_datanodes_for_regions(
336 &self,
337 regions: &[RegionId],
338 ) -> Result<(Region2Peers, Peer2Regions)> {
339 let all_related_regions = self
340 .find_related_regions(regions)
341 .await?
342 .into_iter()
343 .flat_map(|(k, mut v)| {
344 v.push(k);
345 v
346 })
347 .collect_vec();
348 let mut region_to_peer = HashMap::new();
349 let mut peer_to_regions = HashMap::new();
350
351 let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
353 for region_id in all_related_regions {
354 let table_id = region_id.table_id();
355 table_to_regions
356 .entry(table_id)
357 .or_default()
358 .push(region_id);
359 }
360
361 for (table_id, table_regions) in table_to_regions {
363 match self.ctx.get_table_route(table_id).await {
364 Ok((_phy_table_id, table_route)) => {
365 self.get_table_regions_peer(
366 &table_route,
367 &table_regions,
368 &mut region_to_peer,
369 &mut peer_to_regions,
370 );
371 }
372 Err(e) => {
373 warn!(
376 "Failed to get table route for table {}: {}, skipping its regions",
377 table_id, e
378 );
379 continue;
380 }
381 }
382 }
383
384 Ok((region_to_peer, peer_to_regions))
385 }
386
387 fn get_table_regions_peer(
389 &self,
390 table_route: &PhysicalTableRouteValue,
391 table_regions: &[RegionId],
392 region_to_peer: &mut Region2Peers,
393 peer_to_regions: &mut Peer2Regions,
394 ) {
395 for ®ion_id in table_regions {
396 let mut found = false;
397
398 for region_route in &table_route.region_routes {
400 if region_route.region.id == region_id
401 && let Some(leader_peer) = ®ion_route.leader_peer
402 {
403 region_to_peer.insert(
404 region_id,
405 (leader_peer.clone(), region_route.follower_peers.clone()),
406 );
407 peer_to_regions
408 .entry(leader_peer.clone())
409 .or_default()
410 .insert(region_id);
411 found = true;
412 break;
413 }
414 }
415
416 if !found {
417 warn!(
418 "Failed to find region {} in table route or no leader peer found",
419 region_id,
420 );
421 }
422 }
423 }
424
425 async fn batch_should_use_full_listing(
426 &self,
427 region_ids: &[RegionId],
428 ) -> HashMap<RegionId, bool> {
429 let mut result = HashMap::new();
430 let mut gc_tracker = self.region_gc_tracker.lock().await;
431 let now = Instant::now();
432 for ®ion_id in region_ids {
433 let use_full_listing = {
434 if let Some(gc_info) = gc_tracker.get(®ion_id) {
435 if let Some(last_full_listing) = gc_info.last_full_listing_time {
436 let elapsed = now.duration_since(last_full_listing);
438 elapsed >= self.config.full_file_listing_interval
439 } else {
440 true
442 }
443 } else {
444 gc_tracker.insert(
446 region_id,
447 RegionGcInfo {
448 last_gc_time: now,
449 last_full_listing_time: Some(now),
450 },
451 );
452 false
453 }
454 };
455 result.insert(region_id, use_full_listing);
456 }
457 result
458 }
459}