1use std::collections::{HashMap, HashSet};
16use std::time::Instant;
17
18use common_catalog::consts::MITO_ENGINE;
19use common_meta::datanode::{RegionManifestInfo, RegionStat};
20use common_meta::peer::Peer;
21use common_telemetry::{debug, error, info, warn};
22use futures::StreamExt;
23use itertools::Itertools;
24use ordered_float::OrderedFloat;
25use store_api::region_engine::RegionRole;
26use store_api::storage::{GcReport, RegionId};
27use table::metadata::TableId;
28
29use crate::error::Result;
30use crate::gc::Region2Peers;
31use crate::gc::candidate::GcCandidate;
32use crate::gc::dropped::DroppedRegionCollector;
33use crate::gc::scheduler::{GcJobReport, GcScheduler};
34use crate::gc::tracker::RegionGcInfo;
35use crate::metrics::METRIC_META_GC_CANDIDATE_REGIONS;
36
37impl GcScheduler {
38 pub(crate) async fn trigger_gc(&self) -> Result<GcJobReport> {
39 let start_time = Instant::now();
40 info!("Starting GC cycle");
41
42 let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
44 info!(
45 "Fetched region stats for {} tables",
46 table_to_region_stats.len()
47 );
48
49 let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?;
50
51 let table_reparts = self.ctx.get_table_reparts().await?;
52 let dropped_collector =
53 DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
54 let dropped_assignment = dropped_collector.collect_and_assign(&table_reparts).await?;
55 let candidate_count: usize = per_table_candidates.values().map(|c| c.len()).sum();
56 let dropped_count: usize = dropped_assignment
57 .regions_by_peer
58 .values()
59 .map(|regions| regions.len())
60 .sum();
61 METRIC_META_GC_CANDIDATE_REGIONS.set((candidate_count + dropped_count) as i64);
62
63 if per_table_candidates.is_empty() && dropped_assignment.regions_by_peer.is_empty() {
64 info!("No GC candidates found, skipping GC cycle");
65 return Ok(Default::default());
66 }
67
68 let mut datanode_to_candidates = self
69 .aggregate_candidates_by_datanode(per_table_candidates)
70 .await?;
71
72 self.merge_dropped_regions(&mut datanode_to_candidates, &dropped_assignment);
73
74 if datanode_to_candidates.is_empty() {
75 info!("No valid datanode candidates found, skipping GC cycle");
76 return Ok(Default::default());
77 }
78
79 let report = self
80 .parallel_process_datanodes(
81 datanode_to_candidates,
82 dropped_assignment.force_full_listing,
83 dropped_assignment.region_routes_override,
84 )
85 .await;
86
87 let duration = start_time.elapsed();
88 info!(
89 "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
90 report.per_datanode_reports.len(),
91 report.failed_datanodes.len(),
92 duration
93 );
94 debug!("Detailed GC Job Report: {report:#?}");
95
96 Ok(report)
97 }
98
99 fn merge_dropped_regions(
100 &self,
101 datanode_to_candidates: &mut HashMap<Peer, Vec<(TableId, GcCandidate)>>,
102 assignment: &crate::gc::dropped::DroppedRegionAssignment,
103 ) {
104 for (peer, dropped_infos) in &assignment.regions_by_peer {
105 let entry = datanode_to_candidates.entry(peer.clone()).or_default();
106 for info in dropped_infos {
107 entry.push((info.table_id, dropped_candidate(info.region_id)));
108 }
109 }
110 }
111
112 pub(crate) async fn aggregate_candidates_by_datanode(
113 &self,
114 per_table_candidates: HashMap<TableId, Vec<GcCandidate>>,
115 ) -> Result<HashMap<Peer, Vec<(TableId, GcCandidate)>>> {
116 let mut datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>> = HashMap::new();
117
118 for (table_id, candidates) in per_table_candidates {
119 if candidates.is_empty() {
120 continue;
121 }
122
123 let (phy_table_id, table_peer) = self.ctx.get_table_route(table_id).await?;
125
126 if phy_table_id != table_id {
127 continue;
129 }
130
131 let region_to_peer = table_peer
132 .region_routes
133 .iter()
134 .filter_map(|r| {
135 r.leader_peer
136 .as_ref()
137 .map(|peer| (r.region.id, peer.clone()))
138 })
139 .collect::<HashMap<RegionId, Peer>>();
140
141 for candidate in candidates {
142 if let Some(peer) = region_to_peer.get(&candidate.region_id) {
143 datanode_to_candidates
144 .entry(peer.clone())
145 .or_default()
146 .push((table_id, candidate));
147 } else {
148 warn!(
149 "Skipping region {} for table {}: no leader peer found",
150 candidate.region_id, table_id
151 );
152 }
153 }
154 }
155
156 info!(
157 "Aggregated GC candidates for {} datanodes",
158 datanode_to_candidates.len()
159 );
160 Ok(datanode_to_candidates)
161 }
162
163 pub(crate) async fn parallel_process_datanodes(
165 &self,
166 datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>>,
167 force_full_listing_by_peer: HashMap<Peer, HashSet<RegionId>>,
168 region_routes_override_by_peer: HashMap<Peer, Region2Peers>,
169 ) -> GcJobReport {
170 let mut report = GcJobReport::default();
171
172 let results: Vec<_> = futures::stream::iter(
174 datanode_to_candidates
175 .into_iter()
176 .filter(|(_, candidates)| !candidates.is_empty()),
177 )
178 .map(|(peer, candidates)| {
179 let scheduler = self;
180 let peer_clone = peer.clone();
181 let force_full_listing = force_full_listing_by_peer
182 .get(&peer)
183 .cloned()
184 .unwrap_or_default();
185 let region_routes_override = region_routes_override_by_peer
186 .get(&peer)
187 .cloned()
188 .unwrap_or_default();
189 async move {
190 (
191 peer,
192 scheduler
193 .process_datanode_gc(
194 peer_clone,
195 candidates,
196 force_full_listing,
197 region_routes_override,
198 )
199 .await,
200 )
201 }
202 })
203 .buffer_unordered(self.config.max_concurrent_tables) .collect()
205 .await;
206
207 for (peer, result) in results {
209 match result {
210 Ok(dn_report) => {
211 report.per_datanode_reports.insert(peer.id, dn_report);
212 }
213 Err(e) => {
214 error!(e; "Failed to process datanode GC for peer {}", peer);
215 report.failed_datanodes.entry(peer.id).or_default().push(e);
218 }
219 }
220 }
221
222 report
223 }
224
225 pub(crate) async fn process_datanode_gc(
228 &self,
229 peer: Peer,
230 candidates: Vec<(TableId, GcCandidate)>,
231 force_full_listing: HashSet<RegionId>,
232 region_routes_override: Region2Peers,
233 ) -> Result<GcReport> {
234 info!(
235 "Starting GC for datanode {} with {} candidate regions",
236 peer,
237 candidates.len()
238 );
239
240 if candidates.is_empty() {
241 return Ok(Default::default());
242 }
243
244 let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
245
246 let (gc_report, fully_listed_regions) = {
248 let batch_full_listing_decisions = self
251 .batch_should_use_full_listing(&all_region_ids, &force_full_listing)
252 .await;
253
254 let need_full_list_regions = batch_full_listing_decisions
255 .iter()
256 .filter_map(
257 |(®ion_id, &need_full)| {
258 if need_full { Some(region_id) } else { None }
259 },
260 )
261 .collect_vec();
262 let fast_list_regions = batch_full_listing_decisions
263 .iter()
264 .filter_map(
265 |(®ion_id, &need_full)| {
266 if !need_full { Some(region_id) } else { None }
267 },
268 )
269 .collect_vec();
270
271 let mut combined_report = GcReport::default();
272
273 if !fast_list_regions.is_empty() {
275 match self
276 .ctx
277 .gc_regions(
278 &fast_list_regions,
279 false,
280 self.config.mailbox_timeout,
281 region_routes_override.clone(),
282 )
283 .await
284 {
285 Ok(report) => combined_report.merge(report),
286 Err(e) => {
287 error!(
288 e; "Failed to GC regions {:?} on datanode {}",
289 fast_list_regions, peer,
290 );
291
292 combined_report
294 .need_retry_regions
295 .extend(fast_list_regions.clone().into_iter());
296 }
297 }
298 }
299
300 if !need_full_list_regions.is_empty() {
301 match self
302 .ctx
303 .gc_regions(
304 &need_full_list_regions,
305 true,
306 self.config.mailbox_timeout,
307 region_routes_override,
308 )
309 .await
310 {
311 Ok(report) => combined_report.merge(report),
312 Err(e) => {
313 error!(
314 e; "Failed to GC regions {:?} on datanode {}",
315 need_full_list_regions, peer,
316 );
317
318 combined_report
320 .need_retry_regions
321 .extend(need_full_list_regions.clone());
322 }
323 }
324 }
325 let fully_listed_regions = need_full_list_regions
326 .into_iter()
327 .filter(|r| !combined_report.need_retry_regions.contains(r))
328 .collect::<HashSet<_>>();
329
330 (combined_report, fully_listed_regions)
331 };
332
333 for region_id in &all_region_ids {
335 self.update_full_listing_time(*region_id, fully_listed_regions.contains(region_id))
336 .await;
337 }
338
339 info!(
340 "Completed GC for datanode {}: {} regions processed",
341 peer,
342 all_region_ids.len()
343 );
344
345 Ok(gc_report)
346 }
347
348 async fn batch_should_use_full_listing(
349 &self,
350 region_ids: &[RegionId],
351 force_full_listing: &HashSet<RegionId>,
352 ) -> HashMap<RegionId, bool> {
353 let mut result = HashMap::new();
354 let mut gc_tracker = self.region_gc_tracker.lock().await;
355 let now = Instant::now();
356 for ®ion_id in region_ids {
357 if force_full_listing.contains(®ion_id) {
358 gc_tracker
359 .entry(region_id)
360 .and_modify(|info| {
361 info.last_full_listing_time = Some(now);
362 info.last_gc_time = now;
363 })
364 .or_insert_with(|| RegionGcInfo {
365 last_gc_time: now,
366 last_full_listing_time: Some(now),
367 });
368 result.insert(region_id, true);
369 continue;
370 }
371 let use_full_listing = {
372 if let Some(gc_info) = gc_tracker.get(®ion_id) {
373 if let Some(last_full_listing) = gc_info.last_full_listing_time {
374 let elapsed = now.saturating_duration_since(last_full_listing);
376 elapsed >= self.config.full_file_listing_interval
377 } else {
378 true
380 }
381 } else {
382 gc_tracker.insert(
384 region_id,
385 RegionGcInfo {
386 last_gc_time: now,
387 last_full_listing_time: Some(now),
388 },
389 );
390 false
391 }
392 };
393 result.insert(region_id, use_full_listing);
394 }
395 result
396 }
397}
398
399fn dropped_candidate(region_id: RegionId) -> GcCandidate {
400 GcCandidate {
401 region_id,
402 score: OrderedFloat(0.0),
403 region_stat: dropped_region_stat(region_id),
404 }
405}
406
407fn dropped_region_stat(region_id: RegionId) -> RegionStat {
408 RegionStat {
409 id: region_id,
410 rcus: 0,
411 wcus: 0,
412 approximate_bytes: 0,
413 engine: MITO_ENGINE.to_string(),
414 role: RegionRole::Leader,
415 num_rows: 0,
416 memtable_size: 0,
417 manifest_size: 0,
418 sst_size: 0,
419 sst_num: 0,
420 index_size: 0,
421 region_manifest: RegionManifestInfo::Mito {
422 manifest_version: 0,
423 flushed_entry_id: 0,
424 file_removed_cnt: 0,
425 },
426 written_bytes: 0,
427 data_topic_latest_entry_id: 0,
428 metadata_topic_latest_entry_id: 0,
429 }
430}