1use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::tracing::Instrument as _;
30use common_telemetry::{debug, error, info, warn};
31use common_time::Timestamp;
32use itertools::Itertools;
33use object_store::{Entry, ErrorKind, Lister};
34use serde::{Deserialize, Serialize};
35use snafu::{ResultExt as _, ensure};
36use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
37use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
38use tokio_stream::StreamExt;
39
40use crate::access_layer::AccessLayerRef;
41use crate::cache::CacheManagerRef;
42use crate::cache::file_cache::FileType;
43use crate::config::MitoConfig;
44use crate::error::{
45 DurationOutOfRangeSnafu, InvalidRequestSnafu, JoinSnafu, OpenDalSnafu, Result,
46 TooManyGcJobsSnafu, UnexpectedSnafu,
47};
48use crate::manifest::action::{RegionManifest, RemovedFile};
49use crate::metrics::{
50 GC_DELETE_FILE_CNT, GC_DURATION_SECONDS, GC_ERRORS_TOTAL, GC_FILES_DELETED_TOTAL,
51 GC_ORPHANED_INDEX_FILES, GC_RUNS_TOTAL, GC_SKIPPED_UNPARSABLE_FILES,
52};
53use crate::region::{MitoRegionRef, RegionRoleState};
54use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_indexes};
55use crate::sst::location::{self};
56
57#[cfg(test)]
58mod worker_test;
59
60fn should_delete_file(
63 is_in_manifest: bool,
64 is_in_tmp_ref: bool,
65 is_linger: bool,
66 is_eligible_for_delete: bool,
67 is_region_dropped: bool,
68 _entry: &Entry,
69 _unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
70) -> bool {
71 let is_known = is_linger || is_eligible_for_delete;
72
73 !is_in_manifest
74 && !is_in_tmp_ref
75 && if is_known {
76 is_eligible_for_delete
77 } else {
78 !is_in_tmp_ref && is_region_dropped
79 }
80}
81
82pub struct GcLimiter {
84 pub gc_job_limit: Arc<tokio::sync::Semaphore>,
85 gc_concurrency: usize,
86}
87
88pub type GcLimiterRef = Arc<GcLimiter>;
89
90impl GcLimiter {
91 pub fn new(gc_concurrency: usize) -> Self {
92 Self {
93 gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
94 gc_concurrency,
95 }
96 }
97
98 pub fn running_gc_tasks(&self) -> u32 {
99 (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
100 }
101
102 pub fn gc_concurrency(&self) -> u32 {
103 self.gc_concurrency as u32
104 }
105
106 pub fn gc_stat(&self) -> GcStat {
107 GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
108 }
109
110 pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
114 self.gc_job_limit
115 .clone()
116 .try_acquire_owned()
117 .map_err(|e| match e {
118 TryAcquireError::Closed => UnexpectedSnafu {
119 reason: format!("Failed to acquire gc permit: {e}"),
120 }
121 .build(),
122 TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
123 })
124 }
125}
126
127#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
128#[serde(default)]
129pub struct GcConfig {
130 pub enable: bool,
132 #[serde(with = "humantime_serde")]
137 pub lingering_time: Option<Duration>,
138 #[serde(with = "humantime_serde")]
143 pub unknown_file_lingering_time: Duration,
144 pub max_concurrent_lister_per_gc_job: usize,
147 pub max_concurrent_gc_job: usize,
151}
152
153impl Default for GcConfig {
154 fn default() -> Self {
155 Self {
156 enable: false,
157 lingering_time: Some(Duration::from_secs(60)),
159 unknown_file_lingering_time: Duration::from_secs(60 * 60),
161 max_concurrent_lister_per_gc_job: 32,
162 max_concurrent_gc_job: 4,
163 }
164 }
165}
166
167pub struct LocalGcWorker {
168 pub(crate) access_layer: AccessLayerRef,
169 pub(crate) cache_manager: Option<CacheManagerRef>,
170 pub(crate) regions: BTreeMap<RegionId, Option<MitoRegionRef>>,
171 pub(crate) opt: GcConfig,
173 pub(crate) file_ref_manifest: FileRefsManifest,
178 _permit: OwnedSemaphorePermit,
179 pub full_file_listing: bool,
188}
189
190pub struct ManifestOpenConfig {
191 pub compress_manifest: bool,
192 pub manifest_checkpoint_distance: u64,
193 pub experimental_manifest_keep_removed_file_count: usize,
194 pub experimental_manifest_keep_removed_file_ttl: Duration,
195}
196
197impl From<MitoConfig> for ManifestOpenConfig {
198 fn from(mito_config: MitoConfig) -> Self {
199 Self {
200 compress_manifest: mito_config.compress_manifest,
201 manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
202 experimental_manifest_keep_removed_file_count: mito_config
203 .experimental_manifest_keep_removed_file_count,
204 experimental_manifest_keep_removed_file_ttl: mito_config
205 .experimental_manifest_keep_removed_file_ttl,
206 }
207 }
208}
209
210impl LocalGcWorker {
211 #[allow(clippy::too_many_arguments)]
215 pub async fn try_new(
216 access_layer: AccessLayerRef,
217 cache_manager: Option<CacheManagerRef>,
218 regions_to_gc: BTreeMap<RegionId, Option<MitoRegionRef>>,
219 opt: GcConfig,
220 file_ref_manifest: FileRefsManifest,
221 limiter: &GcLimiterRef,
222 full_file_listing: bool,
223 ) -> Result<Self> {
224 if let Some(first_region_id) = regions_to_gc.keys().next() {
225 let table_id = first_region_id.table_id();
226 for region_id in regions_to_gc.keys() {
227 ensure!(
228 region_id.table_id() == table_id,
229 InvalidRequestSnafu {
230 region_id: *region_id,
231 reason: format!(
232 "Region {} does not belong to table {}",
233 region_id, table_id
234 ),
235 }
236 );
237 }
238 }
239
240 let permit = limiter.permit()?;
241
242 Ok(Self {
243 access_layer,
244 cache_manager,
245 regions: regions_to_gc,
246 opt,
247 file_ref_manifest,
248 _permit: permit,
249 full_file_listing,
250 })
251 }
252
253 pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileRef>>> {
255 let mut tmp_ref_files = HashMap::new();
256 for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
257 tmp_ref_files
258 .entry(*region_id)
259 .or_insert_with(HashSet::new)
260 .extend(file_refs.clone());
261 }
263
264 Ok(tmp_ref_files)
265 }
266
267 #[common_telemetry::tracing::instrument(
273 skip_all,
274 fields(region_count = self.regions.len(), full_file_listing = self.full_file_listing)
275 )]
276 pub async fn run(self) -> Result<GcReport> {
277 info!("LocalGcWorker started");
278 let _timer = GC_DURATION_SECONDS
279 .with_label_values(&["total"])
280 .start_timer();
281 let now = std::time::Instant::now();
282
283 let mut deleted_files = HashMap::new();
284 let mut deleted_indexes = HashMap::new();
285 let mut processed_regions = HashSet::new();
286 let tmp_ref_files = self.read_tmp_ref_files().await?;
287 for (region_id, region) in &self.regions {
288 let per_region_time = std::time::Instant::now();
289 if region.as_ref().map(|r| r.manifest_ctx.current_state())
290 == Some(RegionRoleState::Follower)
291 {
292 return UnexpectedSnafu {
293 reason: format!(
294 "Region {} is in Follower state, should not run GC on follower regions",
295 region_id
296 ),
297 }
298 .fail();
299 }
300 let tmp_ref_files = tmp_ref_files
301 .get(region_id)
302 .cloned()
303 .unwrap_or_else(HashSet::new);
304 let files = self
305 .do_region_gc(*region_id, region.clone(), &tmp_ref_files)
306 .await?;
307 let index_files = files
308 .iter()
309 .filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
310 .collect_vec();
311 let data_files = files
312 .into_iter()
313 .filter_map(|f| match f {
314 RemovedFile::File(file_id, _) => Some(file_id),
315 RemovedFile::Index(_, _) => None,
316 })
317 .collect();
318 deleted_files.insert(*region_id, data_files);
319 deleted_indexes.insert(*region_id, index_files);
320 processed_regions.insert(*region_id);
321 debug!(
322 "GC for region {} took {} secs.",
323 region_id,
324 per_region_time.elapsed().as_secs_f32()
325 );
326 }
327 info!(
328 "LocalGcWorker finished after {} secs.",
329 now.elapsed().as_secs_f32()
330 );
331 let report = GcReport {
332 deleted_files,
333 deleted_indexes,
334 need_retry_regions: HashSet::new(),
335 processed_regions,
336 };
337 Ok(report)
338 }
339}
340
341impl LocalGcWorker {
342 pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
345
346 #[common_telemetry::tracing::instrument(
355 skip_all,
356 fields(
357 region_id = %region_id,
358 full_file_listing = self.full_file_listing,
359 region_present = region.is_some()
360 )
361 )]
362 pub async fn do_region_gc(
363 &self,
364 region_id: RegionId,
365 region: Option<MitoRegionRef>,
366 tmp_ref_files: &HashSet<FileRef>,
367 ) -> Result<Vec<RemovedFile>> {
368 let mode = if self.full_file_listing {
369 "full_listing"
370 } else {
371 "fast"
372 };
373 GC_RUNS_TOTAL.with_label_values(&[mode]).inc();
374 debug!(
375 "Doing gc for region {}, {}",
376 region_id,
377 if region.is_some() {
378 "region found"
379 } else {
380 "region not found, might be dropped"
381 }
382 );
383
384 ensure!(
385 region.is_some() || self.full_file_listing,
386 InvalidRequestSnafu {
387 region_id,
388 reason: "region not found and full_file_listing is false; refusing GC without full listing".to_string(),
389 }
390 );
391
392 let manifest = if let Some(region) = ®ion {
393 let manifest = region.manifest_ctx.manifest().await;
394 let file_ref_manifest_version = self
396 .file_ref_manifest
397 .manifest_version
398 .get(®ion.region_id())
399 .cloned();
400 if file_ref_manifest_version != Some(manifest.manifest_version) {
401 warn!(
403 "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
404 file_ref_manifest_version,
405 region.region_id(),
406 manifest.manifest_version
407 );
408 GC_ERRORS_TOTAL
409 .with_label_values(&["manifest_mismatch"])
410 .inc();
411 return Ok(vec![]);
412 }
413 Some(manifest)
414 } else {
415 None
416 };
417
418 let all_entries = if let Some(manifest) = &manifest
419 && self.full_file_listing
420 {
421 self.list_from_object_store(region_id, manifest.files.len())
424 .await?
425 } else if manifest.is_none() && self.full_file_listing {
426 self.list_from_object_store(region_id, Self::CONCURRENCY_LIST_PER_FILES)
430 .await?
431 } else {
432 vec![]
433 };
434
435 let recently_removed_files = if let Some(manifest) = &manifest {
436 self.get_removed_files_expel_times(manifest).await?
437 } else {
438 Default::default()
439 };
440
441 if recently_removed_files.is_empty() {
442 debug!("No recently removed files to gc for region {}", region_id);
444 }
445
446 let removed_file_cnt = recently_removed_files
447 .values()
448 .map(|s| s.len())
449 .sum::<usize>();
450
451 let current_files = manifest.as_ref().map(|m| &m.files);
452
453 let in_manifest = if let Some(current_files) = current_files {
454 current_files
455 .iter()
456 .map(|(file_id, meta)| (*file_id, meta.index_version()))
457 .collect::<HashMap<_, _>>()
458 } else {
459 Default::default()
460 };
461
462 let is_region_dropped = region.is_none();
463
464 let in_tmp_ref = tmp_ref_files
465 .iter()
466 .map(|file_ref| (file_ref.file_id, file_ref.index_version))
467 .collect::<HashSet<_>>();
468
469 let deletable_files = self
470 .list_to_be_deleted_files(
471 region_id,
472 is_region_dropped,
473 &in_manifest,
474 &in_tmp_ref,
475 recently_removed_files,
476 all_entries,
477 )
478 .await?;
479
480 let unused_file_cnt = deletable_files.len();
481
482 info!(
483 "gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete count: {}",
484 if region.is_none() {
485 "(region dropped)"
486 } else {
487 ""
488 },
489 current_files.map(|c| c.len()).unwrap_or(0),
490 tmp_ref_files.len(),
491 removed_file_cnt,
492 deletable_files.len(),
493 );
494 debug!(
495 "gc: deletable files for region {}: {:?}",
496 region_id, &deletable_files
497 );
498
499 debug!(
500 "Found {} unused index files to delete for region {}",
501 deletable_files.len(),
502 region_id
503 );
504
505 let _delete_timer = GC_DURATION_SECONDS
506 .with_label_values(&["delete_files"])
507 .start_timer();
508 self.delete_files(region_id, &deletable_files).await?;
509
510 debug!(
511 "Successfully deleted {} unused files for region {}",
512 unused_file_cnt, region_id
513 );
514 if let Some(region) = ®ion {
515 let _update_timer = GC_DURATION_SECONDS
516 .with_label_values(&["update_manifest"])
517 .start_timer();
518 self.update_manifest_removed_files(region, deletable_files.clone())
519 .await?;
520 }
521
522 Ok(deletable_files)
523 }
524
525 #[common_telemetry::tracing::instrument(
526 skip_all,
527 fields(region_id = %region_id, removed_file_count = removed_files.len())
528 )]
529 async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
530 let mut index_ids = vec![];
531 let file_pairs = removed_files
532 .iter()
533 .filter_map(|f| match f {
534 RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))),
535 RemovedFile::Index(file_id, index_version) => {
536 let region_index_id =
537 RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version);
538 index_ids.push(region_index_id);
539 None
540 }
541 })
542 .collect_vec();
543 delete_files(
544 region_id,
545 &file_pairs,
546 true,
547 &self.access_layer,
548 &self.cache_manager,
549 )
550 .await?;
551
552 if !file_pairs.is_empty() {
553 let deleted_count = file_pairs.len() as u64;
554 GC_FILES_DELETED_TOTAL
555 .with_label_values(&["parquet"])
556 .inc_by(deleted_count);
557 GC_DELETE_FILE_CNT.inc_by(deleted_count);
558 }
559
560 if !index_ids.is_empty() {
561 let deleted_count = index_ids.len() as u64;
562 delete_indexes(&index_ids, &self.access_layer, &self.cache_manager)
563 .await
564 .inspect_err(|_| {
565 GC_ERRORS_TOTAL.with_label_values(&["delete_failed"]).inc();
566 })?;
567 GC_FILES_DELETED_TOTAL
568 .with_label_values(&["index"])
569 .inc_by(deleted_count);
570 GC_DELETE_FILE_CNT.inc_by(deleted_count);
571 }
572
573 Ok(())
574 }
575
576 #[common_telemetry::tracing::instrument(
578 skip_all,
579 fields(region_id = %region.region_id(), deleted_file_count = deleted_files.len())
580 )]
581 async fn update_manifest_removed_files(
582 &self,
583 region: &MitoRegionRef,
584 deleted_files: Vec<RemovedFile>,
585 ) -> Result<()> {
586 let deleted_file_cnt = deleted_files.len();
587 debug!(
588 "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
589 region.region_id()
590 );
591
592 let mut manager = region.manifest_ctx.manifest_manager.write().await;
593 let cnt = deleted_files.len();
594 manager.clear_deleted_files(deleted_files);
595 debug!(
596 "Updated region_id={} region manifest to clear {cnt} deleted files",
597 region.region_id(),
598 );
599
600 Ok(())
601 }
602
603 pub async fn get_removed_files_expel_times(
608 &self,
609 region_manifest: &Arc<RegionManifest>,
610 ) -> Result<BTreeMap<Timestamp, HashSet<RemovedFile>>> {
611 let mut ret = BTreeMap::new();
612 for files in ®ion_manifest.removed_files.removed_files {
613 let expel_time = Timestamp::new_millisecond(files.removed_at);
614 let set = ret.entry(expel_time).or_insert_with(HashSet::new);
615 set.extend(files.files.iter().cloned());
616 }
617
618 Ok(ret)
619 }
620
621 async fn partition_region_files(
624 &self,
625 region_id: RegionId,
626 concurrency: usize,
627 ) -> Result<Vec<(Lister, Option<String>)>> {
628 let region_dir = self.access_layer.build_region_dir(region_id);
629
630 let partitions = gen_partition_from_concurrency(concurrency);
631 let bounds = vec![None]
632 .into_iter()
633 .chain(partitions.iter().map(|p| Some(p.clone())))
634 .chain(vec![None])
635 .collect::<Vec<_>>();
636
637 let mut listers = vec![];
638 for part in bounds.windows(2) {
639 let start = part[0].clone();
640 let end = part[1].clone();
641 let mut lister = self.access_layer.object_store().lister_with(®ion_dir);
642 if let Some(s) = start {
643 lister = lister.start_after(&s);
644 }
645
646 let lister = lister.await.context(OpenDalSnafu)?;
647 listers.push((lister, end));
648 }
649
650 Ok(listers)
651 }
652
653 #[common_telemetry::tracing::instrument(
657 skip_all,
658 fields(region_id = %region_id, file_cnt_hint = file_cnt_hint)
659 )]
660 async fn list_from_object_store(
661 &self,
662 region_id: RegionId,
663 file_cnt_hint: usize,
664 ) -> Result<Vec<Entry>> {
665 let _timer = GC_DURATION_SECONDS
666 .with_label_values(&["list_files"])
667 .start_timer();
668 let start = tokio::time::Instant::now();
669 let concurrency = (file_cnt_hint / Self::CONCURRENCY_LIST_PER_FILES)
670 .max(1)
671 .min(self.opt.max_concurrent_lister_per_gc_job);
672
673 let listers = self
674 .partition_region_files(region_id, concurrency)
675 .await
676 .inspect_err(|_| {
677 GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
678 })?;
679 let lister_cnt = listers.len();
680
681 let mut all_entries = self
683 .list_region_files_concurrent(listers)
684 .await
685 .inspect_err(|_| {
686 GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
687 })?;
688 let root_cnt = all_entries.len();
689
690 let index_entries = self
694 .list_region_index_files(region_id)
695 .await
696 .inspect_err(|_| {
697 GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
698 })?;
699 let index_cnt = index_entries.len();
700 all_entries.extend(index_entries);
701 info!(
702 "gc: full listing mode cost {} secs using {lister_cnt} lister for root={root_cnt} index={index_cnt} files in region {}.",
703 start.elapsed().as_secs_f64(),
704 region_id
705 );
706 Ok(all_entries)
707 }
708
709 async fn list_region_index_files(&self, region_id: RegionId) -> Result<Vec<Entry>> {
713 let region_dir = self.access_layer.build_region_dir(region_id);
714 let index_dir = object_store::util::join_dir(®ion_dir, "index");
715
716 let mut lister = match self
717 .access_layer
718 .object_store()
719 .lister_with(&index_dir)
720 .await
721 {
722 Ok(l) => l,
723 Err(e) if e.kind() == ErrorKind::NotFound => {
724 debug!(
728 "Index directory not found for region {}: {}. Treating as empty.",
729 region_id, e
730 );
731 return Ok(vec![]);
732 }
733 Err(e) => return Err(e).context(OpenDalSnafu),
734 };
735
736 let mut entries = Vec::new();
737 while let Some(entry) = lister.next().await {
738 let entry = entry.context(OpenDalSnafu)?;
739 if entry.metadata().is_file() && entry.name().ends_with(".puffin") {
740 entries.push(entry);
741 }
742 }
743
744 Ok(entries)
745 }
746
747 async fn list_region_files_concurrent(
750 &self,
751 listers: Vec<(Lister, Option<String>)>,
752 ) -> Result<Vec<Entry>> {
753 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
754 let mut handles = vec![];
755
756 for (lister, end) in listers {
757 let tx = tx.clone();
758 let handle = tokio::spawn(
759 async move {
760 let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
761 Ok(e) => {
762 if let Some(end) = &end {
763 e.name() < end.as_str()
765 } else {
766 true
768 }
769 }
770 Err(err) => {
773 warn!("Failed to list entry: {}", err);
774 true
775 }
776 });
777 let stream = stream
778 .filter(|e| {
779 if let Ok(e) = &e {
780 e.metadata().is_file()
782 } else {
783 true
785 }
786 })
787 .collect::<Vec<_>>()
788 .await;
789 tx.send(stream).await.expect("Failed to send entries");
791 }
792 .instrument(common_telemetry::tracing::info_span!("gc_list_partition")),
793 );
794
795 handles.push(handle);
796 }
797
798 for handle in handles {
800 handle.await.context(JoinSnafu)?;
801 }
802
803 drop(tx); let mut all_entries = vec![];
807 while let Some(stream) = rx.recv().await {
808 for entry in stream {
809 all_entries.push(entry.context(OpenDalSnafu)?);
810 }
811 }
812
813 Ok(all_entries)
814 }
815
816 #[allow(clippy::too_many_arguments)]
817 fn filter_deletable_files(
818 &self,
819 is_region_dropped: bool,
820 entries: Vec<Entry>,
821 in_manifest: &HashMap<FileId, Option<IndexVersion>>,
822 in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
823 may_linger_files: &HashSet<&RemovedFile>,
824 eligible_for_delete: &HashSet<&RemovedFile>,
825 unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
826 ) -> Vec<RemovedFile> {
827 let mut ready_for_delete = vec![];
828 let in_tmp_ref: HashMap<FileId, HashSet<IndexVersion>> =
830 in_tmp_ref
831 .iter()
832 .fold(HashMap::new(), |mut acc, (file, version)| {
833 let indices = acc.entry(*file).or_default();
834 if let Some(version) = version {
835 indices.insert(*version);
836 }
837 acc
838 });
839
840 let may_linger_files: HashMap<FileId, HashSet<&RemovedFile>> = may_linger_files
841 .iter()
842 .fold(HashMap::new(), |mut acc, file| {
843 let indices = acc.entry(file.file_id()).or_default();
844 indices.insert(file);
845 acc
846 });
847
848 let eligible_for_delete: HashMap<FileId, HashSet<&RemovedFile>> = eligible_for_delete
849 .iter()
850 .fold(HashMap::new(), |mut acc, file| {
851 let indices = acc.entry(file.file_id()).or_default();
852 indices.insert(file);
853 acc
854 });
855
856 for entry in entries {
857 let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) {
858 Ok((file_id, file_type)) => (file_id, file_type),
859 Err(err) => {
860 error!(err; "Failed to parse file id from path: {}", entry.name());
861 GC_SKIPPED_UNPARSABLE_FILES.inc();
864 continue;
865 }
866 };
867
868 let should_delete = match file_type {
869 FileType::Parquet => {
870 let is_in_manifest = in_manifest.contains_key(&file_id);
871 let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id);
872 let is_linger = may_linger_files.contains_key(&file_id);
873 let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id);
874
875 should_delete_file(
876 is_in_manifest,
877 is_in_tmp_ref,
878 is_linger,
879 is_eligible_for_delete,
880 is_region_dropped,
881 &entry,
882 unknown_file_may_linger_until,
883 )
884 }
885 FileType::Puffin(version) => {
886 let is_in_manifest = in_manifest
888 .get(&file_id)
889 .map(|opt_ver| *opt_ver == Some(version))
890 .unwrap_or(false);
891 let is_in_tmp_ref = in_tmp_ref
892 .get(&file_id)
893 .map(|versions| versions.contains(&version))
894 .unwrap_or(false);
895 let is_linger = may_linger_files
896 .get(&file_id)
897 .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
898 .unwrap_or(false);
899 let is_eligible_for_delete = eligible_for_delete
900 .get(&file_id)
901 .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
902 .unwrap_or(false);
903
904 should_delete_file(
905 is_in_manifest,
906 is_in_tmp_ref,
907 is_linger,
908 is_eligible_for_delete,
909 is_region_dropped,
910 &entry,
911 unknown_file_may_linger_until,
912 )
913 }
914 };
915
916 if should_delete {
917 let removed_file = match file_type {
918 FileType::Parquet => {
919 RemovedFile::File(file_id, None)
922 }
923 FileType::Puffin(version) => {
924 GC_ORPHANED_INDEX_FILES.inc();
925 RemovedFile::Index(file_id, version)
926 }
927 };
928 ready_for_delete.push(removed_file);
929 }
930 }
931 ready_for_delete
932 }
933
934 pub async fn list_to_be_deleted_files(
943 &self,
944 region_id: RegionId,
945 is_region_dropped: bool,
946 in_manifest: &HashMap<FileId, Option<IndexVersion>>,
947 in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
948 recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
949 all_entries: Vec<Entry>,
950 ) -> Result<Vec<RemovedFile>> {
951 let now = chrono::Utc::now();
952 let may_linger_until = self
953 .opt
954 .lingering_time
955 .map(|lingering_time| {
956 chrono::Duration::from_std(lingering_time)
957 .with_context(|_| DurationOutOfRangeSnafu {
958 input: lingering_time,
959 })
960 .map(|t| now - t)
961 })
962 .transpose()?;
963
964 let unknown_file_may_linger_until = now
965 - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
966 |_| DurationOutOfRangeSnafu {
967 input: self.opt.unknown_file_lingering_time,
968 },
969 )?;
970
971 let threshold =
973 may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
974 let mut recently_removed_files = recently_removed_files;
977 let may_linger_files = match threshold {
978 Some(threshold) => recently_removed_files.split_off(&threshold),
979 None => BTreeMap::new(),
980 };
981 debug!("may_linger_files: {:?}", may_linger_files);
982
983 let all_may_linger_files = may_linger_files.values().flatten().collect::<HashSet<_>>();
984
985 let eligible_for_removal = recently_removed_files
988 .values()
989 .flatten()
990 .collect::<HashSet<_>>();
991
992 if !self.full_file_listing {
995 let files_to_delete: Vec<RemovedFile> = eligible_for_removal
1000 .iter()
1001 .filter(|file_id| {
1002 let in_use = match file_id {
1003 RemovedFile::File(file_id, index_version) => {
1004 in_manifest.get(file_id) == Some(index_version)
1005 || in_tmp_ref.contains(&(*file_id, *index_version))
1006 }
1007 RemovedFile::Index(file_id, index_version) => {
1008 in_manifest.get(file_id) == Some(&Some(*index_version))
1009 || in_tmp_ref.contains(&(*file_id, Some(*index_version)))
1010 }
1011 };
1012 !in_use
1013 })
1014 .map(|&f| f.clone())
1015 .collect();
1016
1017 info!(
1018 "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
1019 region_id,
1020 files_to_delete.len()
1021 );
1022
1023 return Ok(files_to_delete);
1024 }
1025
1026 let all_unused_files_ready_for_delete = self.filter_deletable_files(
1030 is_region_dropped,
1031 all_entries,
1032 in_manifest,
1033 in_tmp_ref,
1034 &all_may_linger_files,
1035 &eligible_for_removal,
1036 unknown_file_may_linger_until,
1037 );
1038
1039 Ok(all_unused_files_ready_for_delete)
1040 }
1041}
1042
1043fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
1053 let n = concurrency.next_power_of_two();
1054 if n <= 1 {
1055 return vec![];
1056 }
1057
1058 let mut d = 0;
1062 let mut p: u128 = 1;
1063 while p < n as u128 {
1064 p *= 16;
1065 d += 1;
1066 }
1067
1068 let total_space = p;
1069 let step = total_space / n as u128;
1070
1071 (1..n)
1072 .map(|i| {
1073 let boundary = i as u128 * step;
1074 format!("{:0width$x}", boundary, width = d)
1075 })
1076 .collect()
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081 use super::*;
1082
1083 #[test]
1084 fn test_gen_partition_from_concurrency() {
1085 let partitions = gen_partition_from_concurrency(1);
1086 assert!(partitions.is_empty());
1087
1088 let partitions = gen_partition_from_concurrency(2);
1089 assert_eq!(partitions, vec!["8"]);
1090
1091 let partitions = gen_partition_from_concurrency(3);
1092 assert_eq!(partitions, vec!["4", "8", "c"]);
1093
1094 let partitions = gen_partition_from_concurrency(4);
1095 assert_eq!(partitions, vec!["4", "8", "c"]);
1096
1097 let partitions = gen_partition_from_concurrency(8);
1098 assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
1099
1100 let partitions = gen_partition_from_concurrency(16);
1101 assert_eq!(
1102 partitions,
1103 vec![
1104 "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
1105 ]
1106 );
1107
1108 let partitions = gen_partition_from_concurrency(32);
1109 assert_eq!(
1110 partitions,
1111 [
1112 "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
1113 "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
1114 "e8", "f0", "f8",
1115 ]
1116 );
1117 }
1118}