1use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::{debug, error, info, warn};
30use common_time::Timestamp;
31use object_store::{Entry, Lister};
32use serde::{Deserialize, Serialize};
33use snafu::ResultExt as _;
34use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
35use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
36use tokio_stream::StreamExt;
37
38use crate::access_layer::AccessLayerRef;
39use crate::cache::CacheManagerRef;
40use crate::config::MitoConfig;
41use crate::error::{
42 DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu,
43};
44use crate::manifest::action::RegionManifest;
45use crate::metrics::GC_DELETE_FILE_CNT;
46use crate::region::{MitoRegionRef, RegionRoleState};
47use crate::sst::file::delete_files;
48use crate::sst::location::{self};
49
50#[cfg(test)]
51mod worker_test;
52
53pub struct GcLimiter {
55 pub gc_job_limit: Arc<tokio::sync::Semaphore>,
56 gc_concurrency: usize,
57}
58
59pub type GcLimiterRef = Arc<GcLimiter>;
60
61impl GcLimiter {
62 pub fn new(gc_concurrency: usize) -> Self {
63 Self {
64 gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
65 gc_concurrency,
66 }
67 }
68
69 pub fn running_gc_tasks(&self) -> u32 {
70 (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
71 }
72
73 pub fn gc_concurrency(&self) -> u32 {
74 self.gc_concurrency as u32
75 }
76
77 pub fn gc_stat(&self) -> GcStat {
78 GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
79 }
80
81 pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
85 self.gc_job_limit
86 .clone()
87 .try_acquire_owned()
88 .map_err(|e| match e {
89 TryAcquireError::Closed => UnexpectedSnafu {
90 reason: format!("Failed to acquire gc permit: {e}"),
91 }
92 .build(),
93 TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
94 })
95 }
96}
97
98#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
99#[serde(default)]
100pub struct GcConfig {
101 pub enable: bool,
103 #[serde(with = "humantime_serde")]
110 pub lingering_time: Option<Duration>,
111 #[serde(with = "humantime_serde")]
116 pub unknown_file_lingering_time: Duration,
117 pub max_concurrent_lister_per_gc_job: usize,
120 pub max_concurrent_gc_job: usize,
124}
125
126impl Default for GcConfig {
127 fn default() -> Self {
128 Self {
129 enable: false,
130 lingering_time: Some(Duration::from_secs(60)),
132 unknown_file_lingering_time: Duration::from_secs(60 * 60),
134 max_concurrent_lister_per_gc_job: 32,
135 max_concurrent_gc_job: 4,
136 }
137 }
138}
139
140pub struct LocalGcWorker {
141 pub(crate) access_layer: AccessLayerRef,
142 pub(crate) cache_manager: Option<CacheManagerRef>,
143 pub(crate) regions: BTreeMap<RegionId, MitoRegionRef>,
144 pub(crate) opt: GcConfig,
146 pub(crate) file_ref_manifest: FileRefsManifest,
151 _permit: OwnedSemaphorePermit,
152 pub full_file_listing: bool,
161}
162
163pub struct ManifestOpenConfig {
164 pub compress_manifest: bool,
165 pub manifest_checkpoint_distance: u64,
166 pub experimental_manifest_keep_removed_file_count: usize,
167 pub experimental_manifest_keep_removed_file_ttl: Duration,
168}
169
170impl From<MitoConfig> for ManifestOpenConfig {
171 fn from(mito_config: MitoConfig) -> Self {
172 Self {
173 compress_manifest: mito_config.compress_manifest,
174 manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
175 experimental_manifest_keep_removed_file_count: mito_config
176 .experimental_manifest_keep_removed_file_count,
177 experimental_manifest_keep_removed_file_ttl: mito_config
178 .experimental_manifest_keep_removed_file_ttl,
179 }
180 }
181}
182
183impl LocalGcWorker {
184 #[allow(clippy::too_many_arguments)]
188 pub async fn try_new(
189 access_layer: AccessLayerRef,
190 cache_manager: Option<CacheManagerRef>,
191 regions_to_gc: BTreeMap<RegionId, MitoRegionRef>,
192 opt: GcConfig,
193 file_ref_manifest: FileRefsManifest,
194 limiter: &GcLimiterRef,
195 full_file_listing: bool,
196 ) -> Result<Self> {
197 let permit = limiter.permit()?;
198
199 Ok(Self {
200 access_layer,
201 cache_manager,
202 regions: regions_to_gc,
203 opt,
204 file_ref_manifest,
205 _permit: permit,
206 full_file_listing,
207 })
208 }
209
210 pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileId>>> {
216 let mut tmp_ref_files = HashMap::new();
217 for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
218 tmp_ref_files
219 .entry(*region_id)
220 .or_insert_with(HashSet::new)
221 .extend(file_refs.clone());
222 }
223
224 Ok(tmp_ref_files)
225 }
226
227 pub async fn run(self) -> Result<GcReport> {
233 info!("LocalGcWorker started");
234 let now = std::time::Instant::now();
235
236 let mut deleted_files = HashMap::new();
237 let tmp_ref_files = self.read_tmp_ref_files().await?;
238 for (region_id, region) in &self.regions {
239 let per_region_time = std::time::Instant::now();
240 if region.manifest_ctx.current_state() == RegionRoleState::Follower {
241 return UnexpectedSnafu {
242 reason: format!(
243 "Region {} is in Follower state, should not run GC on follower regions",
244 region_id
245 ),
246 }
247 .fail();
248 }
249 let tmp_ref_files = tmp_ref_files
250 .get(region_id)
251 .cloned()
252 .unwrap_or_else(HashSet::new);
253 let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?;
254 deleted_files.insert(*region_id, files);
255 debug!(
256 "GC for region {} took {} secs.",
257 region_id,
258 per_region_time.elapsed().as_secs_f32()
259 );
260 }
261 info!(
262 "LocalGcWorker finished after {} secs.",
263 now.elapsed().as_secs_f32()
264 );
265 let report = GcReport {
266 deleted_files,
267 need_retry_regions: HashSet::new(),
268 };
269 Ok(report)
270 }
271}
272
273impl LocalGcWorker {
274 pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
277
278 pub async fn do_region_gc(
287 &self,
288 region: MitoRegionRef,
289 tmp_ref_files: &HashSet<FileId>,
290 ) -> Result<Vec<FileId>> {
291 let region_id = region.region_id();
292
293 debug!("Doing gc for region {}", region_id);
294 let manifest = region.manifest_ctx.manifest().await;
295 let region_id = manifest.metadata.region_id;
296 let current_files = &manifest.files;
297
298 let recently_removed_files = self.get_removed_files_expel_times(&manifest).await?;
299
300 if recently_removed_files.is_empty() {
301 debug!("No recently removed files to gc for region {}", region_id);
303 }
304
305 let removed_file_cnt = recently_removed_files
306 .values()
307 .map(|s| s.len())
308 .sum::<usize>();
309
310 let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
311 .max(1)
312 .min(self.opt.max_concurrent_lister_per_gc_job);
313
314 let in_used: HashSet<FileId> = current_files
315 .keys()
316 .cloned()
317 .chain(tmp_ref_files.clone().into_iter())
318 .collect();
319
320 let unused_files = self
321 .list_to_be_deleted_files(region_id, &in_used, recently_removed_files, concurrency)
322 .await?;
323
324 let unused_file_cnt = unused_files.len();
325
326 debug!(
327 "gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, In-used files: {}, recently removed files: {}, Unused files to delete: {} ",
328 current_files.len(),
329 tmp_ref_files.len(),
330 in_used.len(),
331 removed_file_cnt,
332 unused_files.len()
333 );
334
335 let file_pairs: Vec<(FileId, FileId)> = unused_files
338 .iter()
339 .map(|file_id| (*file_id, *file_id))
340 .collect();
341
342 debug!(
343 "Found {} unused index files to delete for region {}",
344 file_pairs.len(),
345 region_id
346 );
347
348 self.delete_files(region_id, &file_pairs).await?;
349
350 debug!(
351 "Successfully deleted {} unused files for region {}",
352 unused_file_cnt, region_id
353 );
354 self.update_manifest_removed_files(®ion, unused_files.clone())
356 .await?;
357
358 Ok(unused_files)
359 }
360
361 async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> {
362 delete_files(
363 region_id,
364 file_ids,
365 true,
366 &self.access_layer,
367 &self.cache_manager,
368 )
369 .await?;
370
371 GC_DELETE_FILE_CNT.add(file_ids.len() as i64);
373
374 Ok(())
375 }
376
377 async fn update_manifest_removed_files(
379 &self,
380 region: &MitoRegionRef,
381 deleted_files: Vec<FileId>,
382 ) -> Result<()> {
383 let deleted_file_cnt = deleted_files.len();
384 debug!(
385 "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
386 region.region_id()
387 );
388
389 let mut manager = region.manifest_ctx.manifest_manager.write().await;
390 let cnt = deleted_files.len();
391 manager.clear_deleted_files(deleted_files);
392 debug!(
393 "Updated region_id={} region manifest to clear {cnt} deleted files",
394 region.region_id(),
395 );
396
397 Ok(())
398 }
399
400 pub async fn get_removed_files_expel_times(
405 &self,
406 region_manifest: &Arc<RegionManifest>,
407 ) -> Result<BTreeMap<Timestamp, HashSet<FileId>>> {
408 let mut ret = BTreeMap::new();
409 for files in ®ion_manifest.removed_files.removed_files {
410 let expel_time = Timestamp::new_millisecond(files.removed_at);
411 let set = ret.entry(expel_time).or_insert_with(HashSet::new);
412 set.extend(files.file_ids.iter().cloned());
413 }
414
415 Ok(ret)
416 }
417
418 async fn partition_region_files(
421 &self,
422 region_id: RegionId,
423 concurrency: usize,
424 ) -> Result<Vec<(Lister, Option<String>)>> {
425 let region_dir = self.access_layer.build_region_dir(region_id);
426
427 let partitions = gen_partition_from_concurrency(concurrency);
428 let bounds = vec![None]
429 .into_iter()
430 .chain(partitions.iter().map(|p| Some(p.clone())))
431 .chain(vec![None])
432 .collect::<Vec<_>>();
433
434 let mut listers = vec![];
435 for part in bounds.windows(2) {
436 let start = part[0].clone();
437 let end = part[1].clone();
438 let mut lister = self.access_layer.object_store().lister_with(®ion_dir);
439 if let Some(s) = start {
440 lister = lister.start_after(&s);
441 }
442
443 let lister = lister.await.context(OpenDalSnafu)?;
444 listers.push((lister, end));
445 }
446
447 Ok(listers)
448 }
449
450 async fn list_region_files_concurrent(
453 &self,
454 listers: Vec<(Lister, Option<String>)>,
455 ) -> Result<Vec<Entry>> {
456 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
457 let mut handles = vec![];
458
459 for (lister, end) in listers {
460 let tx = tx.clone();
461 let handle = tokio::spawn(async move {
462 let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
463 Ok(e) => {
464 if let Some(end) = &end {
465 e.name() < end.as_str()
467 } else {
468 true
470 }
471 }
472 Err(err) => {
474 warn!("Failed to list entry: {}", err);
475 true
476 }
477 });
478 let stream = stream
479 .filter(|e| {
480 if let Ok(e) = &e {
481 e.metadata().is_file()
483 } else {
484 true
486 }
487 })
488 .collect::<Vec<_>>()
489 .await;
490 tx.send(stream).await.expect("Failed to send entries");
492 });
493
494 handles.push(handle);
495 }
496
497 for handle in handles {
499 handle.await.context(JoinSnafu)?;
500 }
501
502 drop(tx); let mut all_entries = vec![];
506 while let Some(stream) = rx.recv().await {
507 all_entries.extend(stream.into_iter().filter_map(Result::ok));
508 }
509
510 Ok(all_entries)
511 }
512
513 fn filter_deletable_files(
516 &self,
517 entries: Vec<Entry>,
518 in_use_filenames: &HashSet<&FileId>,
519 may_linger_filenames: &HashSet<&FileId>,
520 eligible_for_removal: &HashSet<&FileId>,
521 unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
522 ) -> (Vec<FileId>, HashSet<FileId>) {
523 let mut all_unused_files_ready_for_delete = vec![];
524 let mut all_in_exist_linger_files = HashSet::new();
525
526 for entry in entries {
527 let file_id = match location::parse_file_id_from_path(entry.name()) {
528 Ok(file_id) => file_id,
529 Err(err) => {
530 error!(err; "Failed to parse file id from path: {}", entry.name());
531 continue;
534 }
535 };
536
537 if may_linger_filenames.contains(&file_id) {
538 all_in_exist_linger_files.insert(file_id);
539 }
540
541 let should_delete = !in_use_filenames.contains(&file_id)
542 && !may_linger_filenames.contains(&file_id)
543 && {
544 if !eligible_for_removal.contains(&file_id) {
545 entry
549 .metadata()
550 .last_modified()
551 .map(|t| t < unknown_file_may_linger_until)
552 .unwrap_or(false)
553 } else {
554 true
556 }
557 };
558
559 if should_delete {
560 all_unused_files_ready_for_delete.push(file_id);
561 }
562 }
563
564 (all_unused_files_ready_for_delete, all_in_exist_linger_files)
565 }
566
567 pub async fn list_to_be_deleted_files(
576 &self,
577 region_id: RegionId,
578 in_used: &HashSet<FileId>,
579 recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
580 concurrency: usize,
581 ) -> Result<Vec<FileId>> {
582 let start = tokio::time::Instant::now();
583 let now = chrono::Utc::now();
584 let may_linger_until = self
585 .opt
586 .lingering_time
587 .map(|lingering_time| {
588 chrono::Duration::from_std(lingering_time)
589 .with_context(|_| DurationOutOfRangeSnafu {
590 input: lingering_time,
591 })
592 .map(|t| now - t)
593 })
594 .transpose()?;
595
596 let unknown_file_may_linger_until = now
597 - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
598 |_| DurationOutOfRangeSnafu {
599 input: self.opt.unknown_file_lingering_time,
600 },
601 )?;
602
603 let threshold =
605 may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
606 let mut recently_removed_files = recently_removed_files;
607 let may_linger_files = match threshold {
608 Some(threshold) => recently_removed_files.split_off(&threshold),
609 None => BTreeMap::new(),
610 };
611 debug!("may_linger_files: {:?}", may_linger_files);
612
613 let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
614
615 let eligible_for_removal = recently_removed_files
616 .values()
617 .flatten()
618 .collect::<HashSet<_>>();
619
620 let in_use_filenames = in_used.iter().collect::<HashSet<_>>();
622
623 if !self.full_file_listing {
626 let files_to_delete: Vec<FileId> = eligible_for_removal
631 .iter()
632 .filter(|file_id| !in_use_filenames.contains(*file_id))
633 .map(|&f| *f)
634 .collect();
635
636 info!(
637 "gc: fast mode (no full listing) cost {} secs for region {}, found {} files to delete from manifest",
638 start.elapsed().as_secs_f64(),
639 region_id,
640 files_to_delete.len()
641 );
642
643 return Ok(files_to_delete);
644 }
645
646 let listers = self.partition_region_files(region_id, concurrency).await?;
649 let lister_cnt = listers.len();
650
651 let all_entries = self.list_region_files_concurrent(listers).await?;
653
654 let cnt = all_entries.len();
655
656 let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
658 .filter_deletable_files(
659 all_entries,
660 &in_use_filenames,
661 &may_linger_filenames,
662 &eligible_for_removal,
663 unknown_file_may_linger_until,
664 );
665
666 info!(
667 "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}, found {} unused files to delete",
668 start.elapsed().as_secs_f64(),
669 region_id,
670 all_unused_files_ready_for_delete.len()
671 );
672 debug!("All in exist linger files: {:?}", all_in_exist_linger_files);
673
674 Ok(all_unused_files_ready_for_delete)
675 }
676}
677
678fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
688 let n = concurrency.next_power_of_two();
689 if n <= 1 {
690 return vec![];
691 }
692
693 let mut d = 0;
697 let mut p: u128 = 1;
698 while p < n as u128 {
699 p *= 16;
700 d += 1;
701 }
702
703 let total_space = p;
704 let step = total_space / n as u128;
705
706 (1..n)
707 .map(|i| {
708 let boundary = i as u128 * step;
709 format!("{:0width$x}", boundary, width = d)
710 })
711 .collect()
712}
713
714#[cfg(test)]
715mod tests {
716 use super::*;
717
718 #[test]
719 fn test_gen_partition_from_concurrency() {
720 let partitions = gen_partition_from_concurrency(1);
721 assert!(partitions.is_empty());
722
723 let partitions = gen_partition_from_concurrency(2);
724 assert_eq!(partitions, vec!["8"]);
725
726 let partitions = gen_partition_from_concurrency(3);
727 assert_eq!(partitions, vec!["4", "8", "c"]);
728
729 let partitions = gen_partition_from_concurrency(4);
730 assert_eq!(partitions, vec!["4", "8", "c"]);
731
732 let partitions = gen_partition_from_concurrency(8);
733 assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
734
735 let partitions = gen_partition_from_concurrency(16);
736 assert_eq!(
737 partitions,
738 vec![
739 "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
740 ]
741 );
742
743 let partitions = gen_partition_from_concurrency(32);
744 assert_eq!(
745 partitions,
746 [
747 "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
748 "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
749 "e8", "f0", "f8",
750 ]
751 );
752 }
753}