1use std::collections::{HashMap, HashSet};
16use std::time::Instant;
17
18use common_catalog::consts::MITO_ENGINE;
19use common_meta::datanode::{RegionManifestInfo, RegionStat};
20use common_meta::peer::Peer;
21use common_telemetry::tracing::Instrument as _;
22use common_telemetry::{debug, error, info, warn};
23use futures::StreamExt;
24use itertools::Itertools;
25use ordered_float::OrderedFloat;
26use store_api::region_engine::RegionRole;
27use store_api::storage::{GcReport, RegionId};
28use table::metadata::TableId;
29
30use crate::error::Result;
31use crate::gc::Region2Peers;
32use crate::gc::candidate::GcCandidate;
33use crate::gc::dropped::DroppedRegionCollector;
34use crate::gc::scheduler::{GcJobReport, GcScheduler};
35use crate::gc::tracker::RegionGcInfo;
36use crate::metrics::METRIC_META_GC_CANDIDATE_REGIONS;
37
38impl GcScheduler {
39 pub(crate) async fn trigger_gc(&self) -> Result<GcJobReport> {
40 let start_time = Instant::now();
41 info!("Starting GC cycle");
42
43 let table_to_region_stats = self
45 .ctx
46 .get_table_to_region_stats()
47 .instrument(common_telemetry::tracing::info_span!(
48 "meta_gc_get_region_stats"
49 ))
50 .await?;
51 info!(
52 "Fetched region stats for {} tables",
53 table_to_region_stats.len()
54 );
55
56 let per_table_candidates = self
57 .select_gc_candidates(&table_to_region_stats)
58 .instrument(common_telemetry::tracing::info_span!(
59 "meta_gc_select_candidates"
60 ))
61 .await?;
62
63 let table_reparts = self
64 .ctx
65 .get_table_reparts()
66 .instrument(common_telemetry::tracing::info_span!(
67 "meta_gc_get_table_reparts"
68 ))
69 .await?;
70 let dropped_collector =
71 DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
72 let dropped_assignment = dropped_collector
73 .collect_and_assign(&table_reparts)
74 .instrument(common_telemetry::tracing::info_span!(
75 "meta_gc_collect_dropped_regions"
76 ))
77 .await?;
78 let candidate_count: usize = per_table_candidates.values().map(|c| c.len()).sum();
79 let dropped_count: usize = dropped_assignment
80 .regions_by_peer
81 .values()
82 .map(|regions| regions.len())
83 .sum();
84 METRIC_META_GC_CANDIDATE_REGIONS.set((candidate_count + dropped_count) as i64);
85
86 if per_table_candidates.is_empty() && dropped_assignment.regions_by_peer.is_empty() {
87 info!("No GC candidates found, skipping GC cycle");
88 return Ok(Default::default());
89 }
90
91 let mut datanode_to_candidates = self
92 .aggregate_candidates_by_datanode(per_table_candidates)
93 .instrument(common_telemetry::tracing::info_span!(
94 "meta_gc_aggregate_by_datanode"
95 ))
96 .await?;
97
98 self.merge_dropped_regions(&mut datanode_to_candidates, &dropped_assignment);
99
100 if datanode_to_candidates.is_empty() {
101 info!("No valid datanode candidates found, skipping GC cycle");
102 return Ok(Default::default());
103 }
104
105 let report = self
106 .parallel_process_datanodes(
107 datanode_to_candidates,
108 dropped_assignment.force_full_listing,
109 dropped_assignment.region_routes_override,
110 )
111 .instrument(common_telemetry::tracing::info_span!(
112 "meta_gc_dispatch_to_datanodes"
113 ))
114 .await;
115
116 let duration = start_time.elapsed();
117 match &report {
118 GcJobReport::PerDatanode {
119 per_datanode_reports,
120 failed_datanodes,
121 } => {
122 info!(
123 "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
124 per_datanode_reports.len(),
125 failed_datanodes.len(),
126 duration
127 );
128 }
129 GcJobReport::Combined { report } => {
130 info!(
131 "Finished GC cycle with combined report. Deleted files: {}, deleted indexes: {}. Duration: {:?}",
132 report.deleted_files.len(),
133 report.deleted_indexes.len(),
134 duration
135 );
136 }
137 }
138 debug!("Detailed GC Job Report: {report:#?}");
139
140 Ok(report)
141 }
142
143 fn merge_dropped_regions(
144 &self,
145 datanode_to_candidates: &mut HashMap<Peer, Vec<(TableId, GcCandidate)>>,
146 assignment: &crate::gc::dropped::DroppedRegionAssignment,
147 ) {
148 for (peer, dropped_infos) in &assignment.regions_by_peer {
149 let entry = datanode_to_candidates.entry(peer.clone()).or_default();
150 for info in dropped_infos {
151 entry.push((info.table_id, dropped_candidate(info.region_id)));
152 }
153 }
154 }
155
156 pub(crate) async fn aggregate_candidates_by_datanode(
157 &self,
158 per_table_candidates: HashMap<TableId, Vec<GcCandidate>>,
159 ) -> Result<HashMap<Peer, Vec<(TableId, GcCandidate)>>> {
160 let mut datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>> = HashMap::new();
161
162 for (table_id, candidates) in per_table_candidates {
163 if candidates.is_empty() {
164 continue;
165 }
166
167 let (phy_table_id, table_peer) = self.ctx.get_table_route(table_id).await?;
169
170 if phy_table_id != table_id {
171 continue;
173 }
174
175 let region_to_peer = table_peer
176 .region_routes
177 .iter()
178 .filter_map(|r| {
179 r.leader_peer
180 .as_ref()
181 .map(|peer| (r.region.id, peer.clone()))
182 })
183 .collect::<HashMap<RegionId, Peer>>();
184
185 for candidate in candidates {
186 if let Some(peer) = region_to_peer.get(&candidate.region_id) {
187 datanode_to_candidates
188 .entry(peer.clone())
189 .or_default()
190 .push((table_id, candidate));
191 } else {
192 warn!(
193 "Skipping region {} for table {}: no leader peer found",
194 candidate.region_id, table_id
195 );
196 }
197 }
198 }
199
200 info!(
201 "Aggregated GC candidates for {} datanodes",
202 datanode_to_candidates.len()
203 );
204 Ok(datanode_to_candidates)
205 }
206
207 pub(crate) async fn parallel_process_datanodes(
209 &self,
210 datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>>,
211 force_full_listing_by_peer: HashMap<Peer, HashSet<RegionId>>,
212 region_routes_override_by_peer: HashMap<Peer, Region2Peers>,
213 ) -> GcJobReport {
214 let mut per_datanode_reports = HashMap::new();
215 let mut failed_datanodes: HashMap<_, Vec<_>> = HashMap::new();
216
217 let results: Vec<_> = futures::stream::iter(
219 datanode_to_candidates
220 .into_iter()
221 .filter(|(_, candidates)| !candidates.is_empty()),
222 )
223 .map(|(peer, candidates)| {
224 let scheduler = self;
225 let peer_clone = peer.clone();
226 let force_full_listing = force_full_listing_by_peer
227 .get(&peer)
228 .cloned()
229 .unwrap_or_default();
230 let region_routes_override = region_routes_override_by_peer
231 .get(&peer)
232 .cloned()
233 .unwrap_or_default();
234 async move {
235 (
236 peer,
237 scheduler
238 .process_datanode_gc(
239 peer_clone,
240 candidates,
241 force_full_listing,
242 region_routes_override,
243 )
244 .await,
245 )
246 }
247 })
248 .buffer_unordered(self.config.max_concurrent_tables) .collect()
250 .await;
251
252 for (peer, result) in results {
254 match result {
255 Ok(dn_report) => {
256 per_datanode_reports.insert(peer.id, dn_report);
257 }
258 Err(e) => {
259 error!(e; "Failed to process datanode GC for peer {}", peer);
260 failed_datanodes.entry(peer.id).or_default().push(e);
263 }
264 }
265 }
266
267 GcJobReport::PerDatanode {
268 per_datanode_reports,
269 failed_datanodes,
270 }
271 }
272
273 pub(crate) async fn process_datanode_gc(
276 &self,
277 peer: Peer,
278 candidates: Vec<(TableId, GcCandidate)>,
279 force_full_listing: HashSet<RegionId>,
280 region_routes_override: Region2Peers,
281 ) -> Result<GcReport> {
282 info!(
283 "Starting GC for datanode {} with {} candidate regions",
284 peer,
285 candidates.len()
286 );
287
288 if candidates.is_empty() {
289 return Ok(Default::default());
290 }
291
292 let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
293
294 let (gc_report, fully_listed_regions) = {
296 let batch_full_listing_decisions = self
299 .batch_should_use_full_listing(&all_region_ids, &force_full_listing)
300 .await;
301
302 let need_full_list_regions = batch_full_listing_decisions
303 .iter()
304 .filter_map(
305 |(®ion_id, &need_full)| {
306 if need_full { Some(region_id) } else { None }
307 },
308 )
309 .collect_vec();
310 let fast_list_regions = batch_full_listing_decisions
311 .iter()
312 .filter_map(
313 |(®ion_id, &need_full)| {
314 if !need_full { Some(region_id) } else { None }
315 },
316 )
317 .collect_vec();
318
319 let mut combined_report = GcReport::default();
320
321 if !fast_list_regions.is_empty() {
323 match self
324 .ctx
325 .gc_regions(
326 &fast_list_regions,
327 false,
328 self.config.mailbox_timeout,
329 region_routes_override.clone(),
330 )
331 .instrument(common_telemetry::tracing::info_span!(
332 "meta_gc_call_datanode",
333 peer = %peer,
334 mode = "fast",
335 region_count = fast_list_regions.len()
336 ))
337 .await
338 {
339 Ok(report) => combined_report.merge(report),
340 Err(e) => {
341 error!(
342 e; "Failed to GC regions {:?} on datanode {}",
343 fast_list_regions, peer,
344 );
345
346 combined_report
348 .need_retry_regions
349 .extend(fast_list_regions.clone().into_iter());
350 }
351 }
352 }
353
354 if !need_full_list_regions.is_empty() {
355 match self
356 .ctx
357 .gc_regions(
358 &need_full_list_regions,
359 true,
360 self.config.mailbox_timeout,
361 region_routes_override,
362 )
363 .instrument(common_telemetry::tracing::info_span!(
364 "meta_gc_call_datanode",
365 peer = %peer,
366 mode = "full",
367 region_count = need_full_list_regions.len()
368 ))
369 .await
370 {
371 Ok(report) => combined_report.merge(report),
372 Err(e) => {
373 error!(
374 e; "Failed to GC regions {:?} on datanode {}",
375 need_full_list_regions, peer,
376 );
377
378 combined_report
380 .need_retry_regions
381 .extend(need_full_list_regions.clone());
382 }
383 }
384 }
385 let fully_listed_regions = need_full_list_regions
386 .into_iter()
387 .filter(|r| !combined_report.need_retry_regions.contains(r))
388 .collect::<HashSet<_>>();
389
390 (combined_report, fully_listed_regions)
391 };
392
393 for region_id in &all_region_ids {
395 self.update_full_listing_time(*region_id, fully_listed_regions.contains(region_id))
396 .await;
397 }
398
399 info!(
400 "Completed GC for datanode {}: {} regions processed",
401 peer,
402 all_region_ids.len()
403 );
404
405 Ok(gc_report)
406 }
407
408 async fn batch_should_use_full_listing(
409 &self,
410 region_ids: &[RegionId],
411 force_full_listing: &HashSet<RegionId>,
412 ) -> HashMap<RegionId, bool> {
413 let mut result = HashMap::new();
414 let mut gc_tracker = self.region_gc_tracker.lock().await;
415 let now = Instant::now();
416 for ®ion_id in region_ids {
417 if force_full_listing.contains(®ion_id) {
418 gc_tracker
419 .entry(region_id)
420 .and_modify(|info| {
421 info.last_full_listing_time = Some(now);
422 info.last_gc_time = now;
423 })
424 .or_insert_with(|| RegionGcInfo {
425 last_gc_time: now,
426 last_full_listing_time: Some(now),
427 });
428 result.insert(region_id, true);
429 continue;
430 }
431 let use_full_listing = {
432 if let Some(gc_info) = gc_tracker.get(®ion_id) {
433 if let Some(last_full_listing) = gc_info.last_full_listing_time {
434 let elapsed = now.saturating_duration_since(last_full_listing);
436 elapsed >= self.config.full_file_listing_interval
437 } else {
438 true
440 }
441 } else {
442 gc_tracker.insert(
444 region_id,
445 RegionGcInfo {
446 last_gc_time: now,
447 last_full_listing_time: Some(now),
448 },
449 );
450 false
451 }
452 };
453 result.insert(region_id, use_full_listing);
454 }
455 result
456 }
457}
458
459fn dropped_candidate(region_id: RegionId) -> GcCandidate {
460 GcCandidate {
461 region_id,
462 score: OrderedFloat(0.0),
463 region_stat: dropped_region_stat(region_id),
464 }
465}
466
467fn dropped_region_stat(region_id: RegionId) -> RegionStat {
468 RegionStat {
469 id: region_id,
470 rcus: 0,
471 wcus: 0,
472 approximate_bytes: 0,
473 engine: MITO_ENGINE.to_string(),
474 role: RegionRole::Leader,
475 num_rows: 0,
476 memtable_size: 0,
477 manifest_size: 0,
478 sst_size: 0,
479 sst_num: 0,
480 index_size: 0,
481 region_manifest: RegionManifestInfo::Mito {
482 manifest_version: 0,
483 flushed_entry_id: 0,
484 file_removed_cnt: 0,
485 },
486 written_bytes: 0,
487 data_topic_latest_entry_id: 0,
488 metadata_topic_latest_entry_id: 0,
489 }
490}