1use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::tracing::Instrument as _;
30use common_telemetry::{debug, error, info, warn};
31use common_time::Timestamp;
32use itertools::Itertools;
33use object_store::{Entry, Lister};
34use serde::{Deserialize, Serialize};
35use snafu::{ResultExt as _, ensure};
36use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
37use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
38use tokio_stream::StreamExt;
39
40use crate::access_layer::AccessLayerRef;
41use crate::cache::CacheManagerRef;
42use crate::cache::file_cache::FileType;
43use crate::config::MitoConfig;
44use crate::error::{
45 DurationOutOfRangeSnafu, InvalidRequestSnafu, JoinSnafu, OpenDalSnafu, Result,
46 TooManyGcJobsSnafu, UnexpectedSnafu,
47};
48use crate::manifest::action::{RegionManifest, RemovedFile};
49use crate::metrics::{
50 GC_DELETE_FILE_CNT, GC_DURATION_SECONDS, GC_ERRORS_TOTAL, GC_FILES_DELETED_TOTAL,
51 GC_ORPHANED_INDEX_FILES, GC_RUNS_TOTAL, GC_SKIPPED_UNPARSABLE_FILES,
52};
53use crate::region::{MitoRegionRef, RegionRoleState};
54use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_index};
55use crate::sst::location::{self};
56
57#[cfg(test)]
58mod worker_test;
59
60fn should_delete_file(
63 is_in_manifest: bool,
64 is_in_tmp_ref: bool,
65 is_linger: bool,
66 is_eligible_for_delete: bool,
67 is_region_dropped: bool,
68 _entry: &Entry,
69 _unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
70) -> bool {
71 let is_known = is_linger || is_eligible_for_delete;
72
73 !is_in_manifest
74 && !is_in_tmp_ref
75 && if is_known {
76 is_eligible_for_delete
77 } else {
78 !is_in_tmp_ref && is_region_dropped
79 }
80}
81
82pub struct GcLimiter {
84 pub gc_job_limit: Arc<tokio::sync::Semaphore>,
85 gc_concurrency: usize,
86}
87
88pub type GcLimiterRef = Arc<GcLimiter>;
89
90impl GcLimiter {
91 pub fn new(gc_concurrency: usize) -> Self {
92 Self {
93 gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
94 gc_concurrency,
95 }
96 }
97
98 pub fn running_gc_tasks(&self) -> u32 {
99 (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
100 }
101
102 pub fn gc_concurrency(&self) -> u32 {
103 self.gc_concurrency as u32
104 }
105
106 pub fn gc_stat(&self) -> GcStat {
107 GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
108 }
109
110 pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
114 self.gc_job_limit
115 .clone()
116 .try_acquire_owned()
117 .map_err(|e| match e {
118 TryAcquireError::Closed => UnexpectedSnafu {
119 reason: format!("Failed to acquire gc permit: {e}"),
120 }
121 .build(),
122 TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
123 })
124 }
125}
126
127#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
128#[serde(default)]
129pub struct GcConfig {
130 pub enable: bool,
132 #[serde(with = "humantime_serde")]
137 pub lingering_time: Option<Duration>,
138 #[serde(with = "humantime_serde")]
143 pub unknown_file_lingering_time: Duration,
144 pub max_concurrent_lister_per_gc_job: usize,
147 pub max_concurrent_gc_job: usize,
151}
152
153impl Default for GcConfig {
154 fn default() -> Self {
155 Self {
156 enable: false,
157 lingering_time: Some(Duration::from_secs(60)),
159 unknown_file_lingering_time: Duration::from_secs(60 * 60),
161 max_concurrent_lister_per_gc_job: 32,
162 max_concurrent_gc_job: 4,
163 }
164 }
165}
166
167pub struct LocalGcWorker {
168 pub(crate) access_layer: AccessLayerRef,
169 pub(crate) cache_manager: Option<CacheManagerRef>,
170 pub(crate) regions: BTreeMap<RegionId, Option<MitoRegionRef>>,
171 pub(crate) opt: GcConfig,
173 pub(crate) file_ref_manifest: FileRefsManifest,
178 _permit: OwnedSemaphorePermit,
179 pub full_file_listing: bool,
188}
189
190pub struct ManifestOpenConfig {
191 pub compress_manifest: bool,
192 pub manifest_checkpoint_distance: u64,
193 pub experimental_manifest_keep_removed_file_count: usize,
194 pub experimental_manifest_keep_removed_file_ttl: Duration,
195}
196
197impl From<MitoConfig> for ManifestOpenConfig {
198 fn from(mito_config: MitoConfig) -> Self {
199 Self {
200 compress_manifest: mito_config.compress_manifest,
201 manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
202 experimental_manifest_keep_removed_file_count: mito_config
203 .experimental_manifest_keep_removed_file_count,
204 experimental_manifest_keep_removed_file_ttl: mito_config
205 .experimental_manifest_keep_removed_file_ttl,
206 }
207 }
208}
209
210impl LocalGcWorker {
211 #[allow(clippy::too_many_arguments)]
215 pub async fn try_new(
216 access_layer: AccessLayerRef,
217 cache_manager: Option<CacheManagerRef>,
218 regions_to_gc: BTreeMap<RegionId, Option<MitoRegionRef>>,
219 opt: GcConfig,
220 file_ref_manifest: FileRefsManifest,
221 limiter: &GcLimiterRef,
222 full_file_listing: bool,
223 ) -> Result<Self> {
224 if let Some(first_region_id) = regions_to_gc.keys().next() {
225 let table_id = first_region_id.table_id();
226 for region_id in regions_to_gc.keys() {
227 ensure!(
228 region_id.table_id() == table_id,
229 InvalidRequestSnafu {
230 region_id: *region_id,
231 reason: format!(
232 "Region {} does not belong to table {}",
233 region_id, table_id
234 ),
235 }
236 );
237 }
238 }
239
240 let permit = limiter.permit()?;
241
242 Ok(Self {
243 access_layer,
244 cache_manager,
245 regions: regions_to_gc,
246 opt,
247 file_ref_manifest,
248 _permit: permit,
249 full_file_listing,
250 })
251 }
252
253 pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileRef>>> {
255 let mut tmp_ref_files = HashMap::new();
256 for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
257 tmp_ref_files
258 .entry(*region_id)
259 .or_insert_with(HashSet::new)
260 .extend(file_refs.clone());
261 }
263
264 Ok(tmp_ref_files)
265 }
266
267 #[common_telemetry::tracing::instrument(
273 skip_all,
274 fields(region_count = self.regions.len(), full_file_listing = self.full_file_listing)
275 )]
276 pub async fn run(self) -> Result<GcReport> {
277 info!("LocalGcWorker started");
278 let _timer = GC_DURATION_SECONDS
279 .with_label_values(&["total"])
280 .start_timer();
281 let now = std::time::Instant::now();
282
283 let mut deleted_files = HashMap::new();
284 let mut deleted_indexes = HashMap::new();
285 let tmp_ref_files = self.read_tmp_ref_files().await?;
286 for (region_id, region) in &self.regions {
287 let per_region_time = std::time::Instant::now();
288 if region.as_ref().map(|r| r.manifest_ctx.current_state())
289 == Some(RegionRoleState::Follower)
290 {
291 return UnexpectedSnafu {
292 reason: format!(
293 "Region {} is in Follower state, should not run GC on follower regions",
294 region_id
295 ),
296 }
297 .fail();
298 }
299 let tmp_ref_files = tmp_ref_files
300 .get(region_id)
301 .cloned()
302 .unwrap_or_else(HashSet::new);
303 let files = self
304 .do_region_gc(*region_id, region.clone(), &tmp_ref_files)
305 .await?;
306 let index_files = files
307 .iter()
308 .filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
309 .collect_vec();
310 deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
311 deleted_indexes.insert(*region_id, index_files);
312 debug!(
313 "GC for region {} took {} secs.",
314 region_id,
315 per_region_time.elapsed().as_secs_f32()
316 );
317 }
318 info!(
319 "LocalGcWorker finished after {} secs.",
320 now.elapsed().as_secs_f32()
321 );
322 let report = GcReport {
323 deleted_files,
324 deleted_indexes,
325 need_retry_regions: HashSet::new(),
326 };
327 Ok(report)
328 }
329}
330
331impl LocalGcWorker {
332 pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
335
336 #[common_telemetry::tracing::instrument(
345 skip_all,
346 fields(
347 region_id = %region_id,
348 full_file_listing = self.full_file_listing,
349 region_present = region.is_some()
350 )
351 )]
352 pub async fn do_region_gc(
353 &self,
354 region_id: RegionId,
355 region: Option<MitoRegionRef>,
356 tmp_ref_files: &HashSet<FileRef>,
357 ) -> Result<Vec<RemovedFile>> {
358 let mode = if self.full_file_listing {
359 "full_listing"
360 } else {
361 "fast"
362 };
363 GC_RUNS_TOTAL.with_label_values(&[mode]).inc();
364 debug!(
365 "Doing gc for region {}, {}",
366 region_id,
367 if region.is_some() {
368 "region found"
369 } else {
370 "region not found, might be dropped"
371 }
372 );
373
374 ensure!(
375 region.is_some() || self.full_file_listing,
376 InvalidRequestSnafu {
377 region_id,
378 reason: "region not found and full_file_listing is false; refusing GC without full listing".to_string(),
379 }
380 );
381
382 let manifest = if let Some(region) = ®ion {
383 let manifest = region.manifest_ctx.manifest().await;
384 let file_ref_manifest_version = self
386 .file_ref_manifest
387 .manifest_version
388 .get(®ion.region_id())
389 .cloned();
390 if file_ref_manifest_version != Some(manifest.manifest_version) {
391 warn!(
393 "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
394 file_ref_manifest_version,
395 region.region_id(),
396 manifest.manifest_version
397 );
398 GC_ERRORS_TOTAL
399 .with_label_values(&["manifest_mismatch"])
400 .inc();
401 return Ok(vec![]);
402 }
403 Some(manifest)
404 } else {
405 None
406 };
407
408 let all_entries = if let Some(manifest) = &manifest
409 && self.full_file_listing
410 {
411 self.list_from_object_store(region_id, manifest.files.len())
414 .await?
415 } else if manifest.is_none() && self.full_file_listing {
416 self.list_from_object_store(region_id, Self::CONCURRENCY_LIST_PER_FILES)
420 .await?
421 } else {
422 vec![]
423 };
424
425 let recently_removed_files = if let Some(manifest) = &manifest {
426 self.get_removed_files_expel_times(manifest).await?
427 } else {
428 Default::default()
429 };
430
431 if recently_removed_files.is_empty() {
432 debug!("No recently removed files to gc for region {}", region_id);
434 }
435
436 let removed_file_cnt = recently_removed_files
437 .values()
438 .map(|s| s.len())
439 .sum::<usize>();
440
441 let current_files = manifest.as_ref().map(|m| &m.files);
442
443 let in_manifest = if let Some(current_files) = current_files {
444 current_files
445 .iter()
446 .map(|(file_id, meta)| (*file_id, meta.index_version()))
447 .collect::<HashMap<_, _>>()
448 } else {
449 Default::default()
450 };
451
452 let is_region_dropped = region.is_none();
453
454 let in_tmp_ref = tmp_ref_files
455 .iter()
456 .map(|file_ref| (file_ref.file_id, file_ref.index_version))
457 .collect::<HashSet<_>>();
458
459 let deletable_files = self
460 .list_to_be_deleted_files(
461 region_id,
462 is_region_dropped,
463 &in_manifest,
464 &in_tmp_ref,
465 recently_removed_files,
466 all_entries,
467 )
468 .await?;
469
470 let unused_file_cnt = deletable_files.len();
471
472 info!(
473 "gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete count: {}",
474 if region.is_none() {
475 "(region dropped)"
476 } else {
477 ""
478 },
479 current_files.map(|c| c.len()).unwrap_or(0),
480 tmp_ref_files.len(),
481 removed_file_cnt,
482 deletable_files.len(),
483 );
484 debug!(
485 "gc: deletable files for region {}: {:?}",
486 region_id, &deletable_files
487 );
488
489 debug!(
490 "Found {} unused index files to delete for region {}",
491 deletable_files.len(),
492 region_id
493 );
494
495 let _delete_timer = GC_DURATION_SECONDS
496 .with_label_values(&["delete_files"])
497 .start_timer();
498 self.delete_files(region_id, &deletable_files).await?;
499
500 debug!(
501 "Successfully deleted {} unused files for region {}",
502 unused_file_cnt, region_id
503 );
504 if let Some(region) = ®ion {
505 let _update_timer = GC_DURATION_SECONDS
506 .with_label_values(&["update_manifest"])
507 .start_timer();
508 self.update_manifest_removed_files(region, deletable_files.clone())
509 .await?;
510 }
511
512 Ok(deletable_files)
513 }
514
515 #[common_telemetry::tracing::instrument(
516 skip_all,
517 fields(region_id = %region_id, removed_file_count = removed_files.len())
518 )]
519 async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
520 let mut index_ids = vec![];
521 let file_pairs = removed_files
522 .iter()
523 .filter_map(|f| match f {
524 RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))),
525 RemovedFile::Index(file_id, index_version) => {
526 let region_index_id =
527 RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version);
528 index_ids.push(region_index_id);
529 None
530 }
531 })
532 .collect_vec();
533 delete_files(
534 region_id,
535 &file_pairs,
536 true,
537 &self.access_layer,
538 &self.cache_manager,
539 )
540 .await?;
541
542 if !file_pairs.is_empty() {
543 let deleted_count = file_pairs.len() as u64;
544 GC_FILES_DELETED_TOTAL
545 .with_label_values(&["parquet"])
546 .inc_by(deleted_count);
547 GC_DELETE_FILE_CNT.inc_by(deleted_count);
548 }
549
550 for index_id in index_ids {
551 match delete_index(index_id, &self.access_layer, &self.cache_manager).await {
552 Ok(()) => {
553 GC_FILES_DELETED_TOTAL.with_label_values(&["index"]).inc();
554 GC_DELETE_FILE_CNT.inc();
555 }
556 Err(err) => {
557 GC_ERRORS_TOTAL.with_label_values(&["delete_failed"]).inc();
558 return Err(err);
559 }
560 }
561 }
562
563 Ok(())
564 }
565
566 #[common_telemetry::tracing::instrument(
568 skip_all,
569 fields(region_id = %region.region_id(), deleted_file_count = deleted_files.len())
570 )]
571 async fn update_manifest_removed_files(
572 &self,
573 region: &MitoRegionRef,
574 deleted_files: Vec<RemovedFile>,
575 ) -> Result<()> {
576 let deleted_file_cnt = deleted_files.len();
577 debug!(
578 "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
579 region.region_id()
580 );
581
582 let mut manager = region.manifest_ctx.manifest_manager.write().await;
583 let cnt = deleted_files.len();
584 manager.clear_deleted_files(deleted_files);
585 debug!(
586 "Updated region_id={} region manifest to clear {cnt} deleted files",
587 region.region_id(),
588 );
589
590 Ok(())
591 }
592
593 pub async fn get_removed_files_expel_times(
598 &self,
599 region_manifest: &Arc<RegionManifest>,
600 ) -> Result<BTreeMap<Timestamp, HashSet<RemovedFile>>> {
601 let mut ret = BTreeMap::new();
602 for files in ®ion_manifest.removed_files.removed_files {
603 let expel_time = Timestamp::new_millisecond(files.removed_at);
604 let set = ret.entry(expel_time).or_insert_with(HashSet::new);
605 set.extend(files.files.iter().cloned());
606 }
607
608 Ok(ret)
609 }
610
611 async fn partition_region_files(
614 &self,
615 region_id: RegionId,
616 concurrency: usize,
617 ) -> Result<Vec<(Lister, Option<String>)>> {
618 let region_dir = self.access_layer.build_region_dir(region_id);
619
620 let partitions = gen_partition_from_concurrency(concurrency);
621 let bounds = vec![None]
622 .into_iter()
623 .chain(partitions.iter().map(|p| Some(p.clone())))
624 .chain(vec![None])
625 .collect::<Vec<_>>();
626
627 let mut listers = vec![];
628 for part in bounds.windows(2) {
629 let start = part[0].clone();
630 let end = part[1].clone();
631 let mut lister = self.access_layer.object_store().lister_with(®ion_dir);
632 if let Some(s) = start {
633 lister = lister.start_after(&s);
634 }
635
636 let lister = lister.await.context(OpenDalSnafu)?;
637 listers.push((lister, end));
638 }
639
640 Ok(listers)
641 }
642
643 #[common_telemetry::tracing::instrument(
647 skip_all,
648 fields(region_id = %region_id, file_cnt_hint = file_cnt_hint)
649 )]
650 async fn list_from_object_store(
651 &self,
652 region_id: RegionId,
653 file_cnt_hint: usize,
654 ) -> Result<Vec<Entry>> {
655 let _timer = GC_DURATION_SECONDS
656 .with_label_values(&["list_files"])
657 .start_timer();
658 let start = tokio::time::Instant::now();
659 let concurrency = (file_cnt_hint / Self::CONCURRENCY_LIST_PER_FILES)
660 .max(1)
661 .min(self.opt.max_concurrent_lister_per_gc_job);
662
663 let listers = self
664 .partition_region_files(region_id, concurrency)
665 .await
666 .inspect_err(|_| {
667 GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
668 })?;
669 let lister_cnt = listers.len();
670
671 let all_entries = self
673 .list_region_files_concurrent(listers)
674 .await
675 .inspect_err(|_| {
676 GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
677 })?;
678 let cnt = all_entries.len();
679 info!(
680 "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
681 start.elapsed().as_secs_f64(),
682 region_id
683 );
684 Ok(all_entries)
685 }
686
687 async fn list_region_files_concurrent(
690 &self,
691 listers: Vec<(Lister, Option<String>)>,
692 ) -> Result<Vec<Entry>> {
693 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
694 let mut handles = vec![];
695
696 for (lister, end) in listers {
697 let tx = tx.clone();
698 let handle = tokio::spawn(
699 async move {
700 let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
701 Ok(e) => {
702 if let Some(end) = &end {
703 e.name() < end.as_str()
705 } else {
706 true
708 }
709 }
710 Err(err) => {
712 warn!("Failed to list entry: {}", err);
713 true
714 }
715 });
716 let stream = stream
717 .filter(|e| {
718 if let Ok(e) = &e {
719 e.metadata().is_file()
721 } else {
722 true
724 }
725 })
726 .collect::<Vec<_>>()
727 .await;
728 tx.send(stream).await.expect("Failed to send entries");
730 }
731 .instrument(common_telemetry::tracing::info_span!("gc_list_partition")),
732 );
733
734 handles.push(handle);
735 }
736
737 for handle in handles {
739 handle.await.context(JoinSnafu)?;
740 }
741
742 drop(tx); let mut all_entries = vec![];
746 while let Some(stream) = rx.recv().await {
747 all_entries.extend(stream.into_iter().filter_map(Result::ok));
748 }
749
750 Ok(all_entries)
751 }
752
753 #[allow(clippy::too_many_arguments)]
754 fn filter_deletable_files(
755 &self,
756 is_region_dropped: bool,
757 entries: Vec<Entry>,
758 in_manifest: &HashMap<FileId, Option<IndexVersion>>,
759 in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
760 may_linger_files: &HashSet<&RemovedFile>,
761 eligible_for_delete: &HashSet<&RemovedFile>,
762 unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
763 ) -> Vec<RemovedFile> {
764 let mut ready_for_delete = vec![];
765 let in_tmp_ref: HashMap<FileId, HashSet<IndexVersion>> =
767 in_tmp_ref
768 .iter()
769 .fold(HashMap::new(), |mut acc, (file, version)| {
770 let indices = acc.entry(*file).or_default();
771 if let Some(version) = version {
772 indices.insert(*version);
773 }
774 acc
775 });
776
777 let may_linger_files: HashMap<FileId, HashSet<&RemovedFile>> = may_linger_files
778 .iter()
779 .fold(HashMap::new(), |mut acc, file| {
780 let indices = acc.entry(file.file_id()).or_default();
781 indices.insert(file);
782 acc
783 });
784
785 let eligible_for_delete: HashMap<FileId, HashSet<&RemovedFile>> = eligible_for_delete
786 .iter()
787 .fold(HashMap::new(), |mut acc, file| {
788 let indices = acc.entry(file.file_id()).or_default();
789 indices.insert(file);
790 acc
791 });
792
793 for entry in entries {
794 let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) {
795 Ok((file_id, file_type)) => (file_id, file_type),
796 Err(err) => {
797 error!(err; "Failed to parse file id from path: {}", entry.name());
798 GC_SKIPPED_UNPARSABLE_FILES.inc();
801 continue;
802 }
803 };
804
805 let should_delete = match file_type {
806 FileType::Parquet => {
807 let is_in_manifest = in_manifest.contains_key(&file_id);
808 let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id);
809 let is_linger = may_linger_files.contains_key(&file_id);
810 let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id);
811
812 should_delete_file(
813 is_in_manifest,
814 is_in_tmp_ref,
815 is_linger,
816 is_eligible_for_delete,
817 is_region_dropped,
818 &entry,
819 unknown_file_may_linger_until,
820 )
821 }
822 FileType::Puffin(version) => {
823 let is_in_manifest = in_manifest
825 .get(&file_id)
826 .map(|opt_ver| *opt_ver == Some(version))
827 .unwrap_or(false);
828 let is_in_tmp_ref = in_tmp_ref
829 .get(&file_id)
830 .map(|versions| versions.contains(&version))
831 .unwrap_or(false);
832 let is_linger = may_linger_files
833 .get(&file_id)
834 .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
835 .unwrap_or(false);
836 let is_eligible_for_delete = eligible_for_delete
837 .get(&file_id)
838 .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
839 .unwrap_or(false);
840
841 should_delete_file(
842 is_in_manifest,
843 is_in_tmp_ref,
844 is_linger,
845 is_eligible_for_delete,
846 is_region_dropped,
847 &entry,
848 unknown_file_may_linger_until,
849 )
850 }
851 };
852
853 if should_delete {
854 let removed_file = match file_type {
855 FileType::Parquet => {
856 RemovedFile::File(file_id, None)
859 }
860 FileType::Puffin(version) => {
861 GC_ORPHANED_INDEX_FILES.inc();
862 RemovedFile::Index(file_id, version)
863 }
864 };
865 ready_for_delete.push(removed_file);
866 }
867 }
868 ready_for_delete
869 }
870
871 pub async fn list_to_be_deleted_files(
880 &self,
881 region_id: RegionId,
882 is_region_dropped: bool,
883 in_manifest: &HashMap<FileId, Option<IndexVersion>>,
884 in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
885 recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
886 all_entries: Vec<Entry>,
887 ) -> Result<Vec<RemovedFile>> {
888 let now = chrono::Utc::now();
889 let may_linger_until = self
890 .opt
891 .lingering_time
892 .map(|lingering_time| {
893 chrono::Duration::from_std(lingering_time)
894 .with_context(|_| DurationOutOfRangeSnafu {
895 input: lingering_time,
896 })
897 .map(|t| now - t)
898 })
899 .transpose()?;
900
901 let unknown_file_may_linger_until = now
902 - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
903 |_| DurationOutOfRangeSnafu {
904 input: self.opt.unknown_file_lingering_time,
905 },
906 )?;
907
908 let threshold =
910 may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
911 let mut recently_removed_files = recently_removed_files;
914 let may_linger_files = match threshold {
915 Some(threshold) => recently_removed_files.split_off(&threshold),
916 None => BTreeMap::new(),
917 };
918 debug!("may_linger_files: {:?}", may_linger_files);
919
920 let all_may_linger_files = may_linger_files.values().flatten().collect::<HashSet<_>>();
921
922 let eligible_for_removal = recently_removed_files
925 .values()
926 .flatten()
927 .collect::<HashSet<_>>();
928
929 if !self.full_file_listing {
932 let files_to_delete: Vec<RemovedFile> = eligible_for_removal
937 .iter()
938 .filter(|file_id| {
939 let in_use = match file_id {
940 RemovedFile::File(file_id, index_version) => {
941 in_manifest.get(file_id) == Some(index_version)
942 || in_tmp_ref.contains(&(*file_id, *index_version))
943 }
944 RemovedFile::Index(file_id, index_version) => {
945 in_manifest.get(file_id) == Some(&Some(*index_version))
946 || in_tmp_ref.contains(&(*file_id, Some(*index_version)))
947 }
948 };
949 !in_use
950 })
951 .map(|&f| f.clone())
952 .collect();
953
954 info!(
955 "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
956 region_id,
957 files_to_delete.len()
958 );
959
960 return Ok(files_to_delete);
961 }
962
963 let all_unused_files_ready_for_delete = self.filter_deletable_files(
967 is_region_dropped,
968 all_entries,
969 in_manifest,
970 in_tmp_ref,
971 &all_may_linger_files,
972 &eligible_for_removal,
973 unknown_file_may_linger_until,
974 );
975
976 Ok(all_unused_files_ready_for_delete)
977 }
978}
979
980fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
990 let n = concurrency.next_power_of_two();
991 if n <= 1 {
992 return vec![];
993 }
994
995 let mut d = 0;
999 let mut p: u128 = 1;
1000 while p < n as u128 {
1001 p *= 16;
1002 d += 1;
1003 }
1004
1005 let total_space = p;
1006 let step = total_space / n as u128;
1007
1008 (1..n)
1009 .map(|i| {
1010 let boundary = i as u128 * step;
1011 format!("{:0width$x}", boundary, width = d)
1012 })
1013 .collect()
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018 use super::*;
1019
1020 #[test]
1021 fn test_gen_partition_from_concurrency() {
1022 let partitions = gen_partition_from_concurrency(1);
1023 assert!(partitions.is_empty());
1024
1025 let partitions = gen_partition_from_concurrency(2);
1026 assert_eq!(partitions, vec!["8"]);
1027
1028 let partitions = gen_partition_from_concurrency(3);
1029 assert_eq!(partitions, vec!["4", "8", "c"]);
1030
1031 let partitions = gen_partition_from_concurrency(4);
1032 assert_eq!(partitions, vec!["4", "8", "c"]);
1033
1034 let partitions = gen_partition_from_concurrency(8);
1035 assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
1036
1037 let partitions = gen_partition_from_concurrency(16);
1038 assert_eq!(
1039 partitions,
1040 vec![
1041 "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
1042 ]
1043 );
1044
1045 let partitions = gen_partition_from_concurrency(32);
1046 assert_eq!(
1047 partitions,
1048 [
1049 "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
1050 "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
1051 "e8", "f0", "f8",
1052 ]
1053 );
1054 }
1055}