1use std::collections::{HashMap, HashSet};
16use std::time::Instant;
17
18use common_catalog::consts::MITO_ENGINE;
19use common_meta::datanode::{RegionManifestInfo, RegionStat};
20use common_meta::peer::Peer;
21use common_telemetry::{debug, error, info, warn};
22use futures::StreamExt;
23use itertools::Itertools;
24use ordered_float::OrderedFloat;
25use store_api::region_engine::RegionRole;
26use store_api::storage::{GcReport, RegionId};
27use table::metadata::TableId;
28
29use crate::error::Result;
30use crate::gc::Region2Peers;
31use crate::gc::candidate::GcCandidate;
32use crate::gc::dropped::DroppedRegionCollector;
33use crate::gc::scheduler::{GcJobReport, GcScheduler};
34use crate::gc::tracker::RegionGcInfo;
35
36impl GcScheduler {
37 pub(crate) async fn trigger_gc(&self) -> Result<GcJobReport> {
38 let start_time = Instant::now();
39 info!("Starting GC cycle");
40
41 let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
43 info!(
44 "Fetched region stats for {} tables",
45 table_to_region_stats.len()
46 );
47
48 let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?;
49
50 let table_reparts = self.ctx.get_table_reparts().await?;
51 let dropped_collector =
52 DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
53 let dropped_assignment = dropped_collector.collect_and_assign(&table_reparts).await?;
54
55 if per_table_candidates.is_empty() && dropped_assignment.regions_by_peer.is_empty() {
56 info!("No GC candidates found, skipping GC cycle");
57 return Ok(Default::default());
58 }
59
60 let mut datanode_to_candidates = self
61 .aggregate_candidates_by_datanode(per_table_candidates)
62 .await?;
63
64 self.merge_dropped_regions(&mut datanode_to_candidates, &dropped_assignment);
65
66 if datanode_to_candidates.is_empty() {
67 info!("No valid datanode candidates found, skipping GC cycle");
68 return Ok(Default::default());
69 }
70
71 let report = self
72 .parallel_process_datanodes(
73 datanode_to_candidates,
74 dropped_assignment.force_full_listing,
75 dropped_assignment.region_routes_override,
76 )
77 .await;
78
79 let duration = start_time.elapsed();
80 info!(
81 "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
82 report.per_datanode_reports.len(),
83 report.failed_datanodes.len(),
84 duration
85 );
86 debug!("Detailed GC Job Report: {report:#?}");
87
88 Ok(report)
89 }
90
91 fn merge_dropped_regions(
92 &self,
93 datanode_to_candidates: &mut HashMap<Peer, Vec<(TableId, GcCandidate)>>,
94 assignment: &crate::gc::dropped::DroppedRegionAssignment,
95 ) {
96 for (peer, dropped_infos) in &assignment.regions_by_peer {
97 let entry = datanode_to_candidates.entry(peer.clone()).or_default();
98 for info in dropped_infos {
99 entry.push((info.table_id, dropped_candidate(info.region_id)));
100 }
101 }
102 }
103
104 pub(crate) async fn aggregate_candidates_by_datanode(
105 &self,
106 per_table_candidates: HashMap<TableId, Vec<GcCandidate>>,
107 ) -> Result<HashMap<Peer, Vec<(TableId, GcCandidate)>>> {
108 let mut datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>> = HashMap::new();
109
110 for (table_id, candidates) in per_table_candidates {
111 if candidates.is_empty() {
112 continue;
113 }
114
115 let (phy_table_id, table_peer) = self.ctx.get_table_route(table_id).await?;
117
118 if phy_table_id != table_id {
119 continue;
121 }
122
123 let region_to_peer = table_peer
124 .region_routes
125 .iter()
126 .filter_map(|r| {
127 r.leader_peer
128 .as_ref()
129 .map(|peer| (r.region.id, peer.clone()))
130 })
131 .collect::<HashMap<RegionId, Peer>>();
132
133 for candidate in candidates {
134 if let Some(peer) = region_to_peer.get(&candidate.region_id) {
135 datanode_to_candidates
136 .entry(peer.clone())
137 .or_default()
138 .push((table_id, candidate));
139 } else {
140 warn!(
141 "Skipping region {} for table {}: no leader peer found",
142 candidate.region_id, table_id
143 );
144 }
145 }
146 }
147
148 info!(
149 "Aggregated GC candidates for {} datanodes",
150 datanode_to_candidates.len()
151 );
152 Ok(datanode_to_candidates)
153 }
154
155 pub(crate) async fn parallel_process_datanodes(
157 &self,
158 datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>>,
159 force_full_listing_by_peer: HashMap<Peer, HashSet<RegionId>>,
160 region_routes_override_by_peer: HashMap<Peer, Region2Peers>,
161 ) -> GcJobReport {
162 let mut report = GcJobReport::default();
163
164 let results: Vec<_> = futures::stream::iter(
166 datanode_to_candidates
167 .into_iter()
168 .filter(|(_, candidates)| !candidates.is_empty()),
169 )
170 .map(|(peer, candidates)| {
171 let scheduler = self;
172 let peer_clone = peer.clone();
173 let force_full_listing = force_full_listing_by_peer
174 .get(&peer)
175 .cloned()
176 .unwrap_or_default();
177 let region_routes_override = region_routes_override_by_peer
178 .get(&peer)
179 .cloned()
180 .unwrap_or_default();
181 async move {
182 (
183 peer,
184 scheduler
185 .process_datanode_gc(
186 peer_clone,
187 candidates,
188 force_full_listing,
189 region_routes_override,
190 )
191 .await,
192 )
193 }
194 })
195 .buffer_unordered(self.config.max_concurrent_tables) .collect()
197 .await;
198
199 for (peer, result) in results {
201 match result {
202 Ok(dn_report) => {
203 report.per_datanode_reports.insert(peer.id, dn_report);
204 }
205 Err(e) => {
206 error!(e; "Failed to process datanode GC for peer {}", peer);
207 report.failed_datanodes.entry(peer.id).or_default().push(e);
210 }
211 }
212 }
213
214 report
215 }
216
217 pub(crate) async fn process_datanode_gc(
220 &self,
221 peer: Peer,
222 candidates: Vec<(TableId, GcCandidate)>,
223 force_full_listing: HashSet<RegionId>,
224 region_routes_override: Region2Peers,
225 ) -> Result<GcReport> {
226 info!(
227 "Starting GC for datanode {} with {} candidate regions",
228 peer,
229 candidates.len()
230 );
231
232 if candidates.is_empty() {
233 return Ok(Default::default());
234 }
235
236 let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
237
238 let (gc_report, fully_listed_regions) = {
240 let batch_full_listing_decisions = self
243 .batch_should_use_full_listing(&all_region_ids, &force_full_listing)
244 .await;
245
246 let need_full_list_regions = batch_full_listing_decisions
247 .iter()
248 .filter_map(
249 |(®ion_id, &need_full)| {
250 if need_full { Some(region_id) } else { None }
251 },
252 )
253 .collect_vec();
254 let fast_list_regions = batch_full_listing_decisions
255 .iter()
256 .filter_map(
257 |(®ion_id, &need_full)| {
258 if !need_full { Some(region_id) } else { None }
259 },
260 )
261 .collect_vec();
262
263 let mut combined_report = GcReport::default();
264
265 if !fast_list_regions.is_empty() {
267 match self
268 .ctx
269 .gc_regions(
270 &fast_list_regions,
271 false,
272 self.config.mailbox_timeout,
273 region_routes_override.clone(),
274 )
275 .await
276 {
277 Ok(report) => combined_report.merge(report),
278 Err(e) => {
279 error!(
280 e; "Failed to GC regions {:?} on datanode {}",
281 fast_list_regions, peer,
282 );
283
284 combined_report
286 .need_retry_regions
287 .extend(fast_list_regions.clone().into_iter());
288 }
289 }
290 }
291
292 if !need_full_list_regions.is_empty() {
293 match self
294 .ctx
295 .gc_regions(
296 &need_full_list_regions,
297 true,
298 self.config.mailbox_timeout,
299 region_routes_override,
300 )
301 .await
302 {
303 Ok(report) => combined_report.merge(report),
304 Err(e) => {
305 error!(
306 e; "Failed to GC regions {:?} on datanode {}",
307 need_full_list_regions, peer,
308 );
309
310 combined_report
312 .need_retry_regions
313 .extend(need_full_list_regions.clone());
314 }
315 }
316 }
317 let fully_listed_regions = need_full_list_regions
318 .into_iter()
319 .filter(|r| !combined_report.need_retry_regions.contains(r))
320 .collect::<HashSet<_>>();
321
322 (combined_report, fully_listed_regions)
323 };
324
325 for region_id in &all_region_ids {
327 self.update_full_listing_time(*region_id, fully_listed_regions.contains(region_id))
328 .await;
329 }
330
331 info!(
332 "Completed GC for datanode {}: {} regions processed",
333 peer,
334 all_region_ids.len()
335 );
336
337 Ok(gc_report)
338 }
339
340 async fn batch_should_use_full_listing(
341 &self,
342 region_ids: &[RegionId],
343 force_full_listing: &HashSet<RegionId>,
344 ) -> HashMap<RegionId, bool> {
345 let mut result = HashMap::new();
346 let mut gc_tracker = self.region_gc_tracker.lock().await;
347 let now = Instant::now();
348 for ®ion_id in region_ids {
349 if force_full_listing.contains(®ion_id) {
350 gc_tracker
351 .entry(region_id)
352 .and_modify(|info| {
353 info.last_full_listing_time = Some(now);
354 info.last_gc_time = now;
355 })
356 .or_insert_with(|| RegionGcInfo {
357 last_gc_time: now,
358 last_full_listing_time: Some(now),
359 });
360 result.insert(region_id, true);
361 continue;
362 }
363 let use_full_listing = {
364 if let Some(gc_info) = gc_tracker.get(®ion_id) {
365 if let Some(last_full_listing) = gc_info.last_full_listing_time {
366 let elapsed = now.saturating_duration_since(last_full_listing);
368 elapsed >= self.config.full_file_listing_interval
369 } else {
370 true
372 }
373 } else {
374 gc_tracker.insert(
376 region_id,
377 RegionGcInfo {
378 last_gc_time: now,
379 last_full_listing_time: Some(now),
380 },
381 );
382 false
383 }
384 };
385 result.insert(region_id, use_full_listing);
386 }
387 result
388 }
389}
390
391fn dropped_candidate(region_id: RegionId) -> GcCandidate {
392 GcCandidate {
393 region_id,
394 score: OrderedFloat(0.0),
395 region_stat: dropped_region_stat(region_id),
396 }
397}
398
399fn dropped_region_stat(region_id: RegionId) -> RegionStat {
400 RegionStat {
401 id: region_id,
402 rcus: 0,
403 wcus: 0,
404 approximate_bytes: 0,
405 engine: MITO_ENGINE.to_string(),
406 role: RegionRole::Leader,
407 num_rows: 0,
408 memtable_size: 0,
409 manifest_size: 0,
410 sst_size: 0,
411 sst_num: 0,
412 index_size: 0,
413 region_manifest: RegionManifestInfo::Mito {
414 manifest_version: 0,
415 flushed_entry_id: 0,
416 file_removed_cnt: 0,
417 },
418 written_bytes: 0,
419 data_topic_latest_entry_id: 0,
420 metadata_topic_latest_entry_id: 0,
421 }
422}