1use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::tracing::Instrument as _;
30use common_telemetry::{debug, error, info, warn};
31use common_time::Timestamp;
32use itertools::Itertools;
33use object_store::{Entry, Lister};
34use serde::{Deserialize, Serialize};
35use snafu::{ResultExt as _, ensure};
36use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
37use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
38use tokio_stream::StreamExt;
39
40use crate::access_layer::AccessLayerRef;
41use crate::cache::CacheManagerRef;
42use crate::cache::file_cache::FileType;
43use crate::config::MitoConfig;
44use crate::error::{
45 DurationOutOfRangeSnafu, InvalidRequestSnafu, JoinSnafu, OpenDalSnafu, Result,
46 TooManyGcJobsSnafu, UnexpectedSnafu,
47};
48use crate::manifest::action::{RegionManifest, RemovedFile};
49use crate::metrics::{
50 GC_DELETE_FILE_CNT, GC_DURATION_SECONDS, GC_ERRORS_TOTAL, GC_FILES_DELETED_TOTAL,
51 GC_ORPHANED_INDEX_FILES, GC_RUNS_TOTAL, GC_SKIPPED_UNPARSABLE_FILES,
52};
53use crate::region::{MitoRegionRef, RegionRoleState};
54use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_indexes};
55use crate::sst::location::{self};
56
57#[cfg(test)]
58mod worker_test;
59
60fn should_delete_file(
63 is_in_manifest: bool,
64 is_in_tmp_ref: bool,
65 is_linger: bool,
66 is_eligible_for_delete: bool,
67 is_region_dropped: bool,
68 _entry: &Entry,
69 _unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
70) -> bool {
71 let is_known = is_linger || is_eligible_for_delete;
72
73 !is_in_manifest
74 && !is_in_tmp_ref
75 && if is_known {
76 is_eligible_for_delete
77 } else {
78 !is_in_tmp_ref && is_region_dropped
79 }
80}
81
82pub struct GcLimiter {
84 pub gc_job_limit: Arc<tokio::sync::Semaphore>,
85 gc_concurrency: usize,
86}
87
88pub type GcLimiterRef = Arc<GcLimiter>;
89
90impl GcLimiter {
91 pub fn new(gc_concurrency: usize) -> Self {
92 Self {
93 gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
94 gc_concurrency,
95 }
96 }
97
98 pub fn running_gc_tasks(&self) -> u32 {
99 (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
100 }
101
102 pub fn gc_concurrency(&self) -> u32 {
103 self.gc_concurrency as u32
104 }
105
106 pub fn gc_stat(&self) -> GcStat {
107 GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
108 }
109
110 pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
114 self.gc_job_limit
115 .clone()
116 .try_acquire_owned()
117 .map_err(|e| match e {
118 TryAcquireError::Closed => UnexpectedSnafu {
119 reason: format!("Failed to acquire gc permit: {e}"),
120 }
121 .build(),
122 TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
123 })
124 }
125}
126
127#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
128#[serde(default)]
129pub struct GcConfig {
130 pub enable: bool,
132 #[serde(with = "humantime_serde")]
137 pub lingering_time: Option<Duration>,
138 #[serde(with = "humantime_serde")]
143 pub unknown_file_lingering_time: Duration,
144 pub max_concurrent_lister_per_gc_job: usize,
147 pub max_concurrent_gc_job: usize,
151}
152
153impl Default for GcConfig {
154 fn default() -> Self {
155 Self {
156 enable: false,
157 lingering_time: Some(Duration::from_secs(60)),
159 unknown_file_lingering_time: Duration::from_secs(60 * 60),
161 max_concurrent_lister_per_gc_job: 32,
162 max_concurrent_gc_job: 4,
163 }
164 }
165}
166
167pub struct LocalGcWorker {
168 pub(crate) access_layer: AccessLayerRef,
169 pub(crate) cache_manager: Option<CacheManagerRef>,
170 pub(crate) regions: BTreeMap<RegionId, Option<MitoRegionRef>>,
171 pub(crate) opt: GcConfig,
173 pub(crate) file_ref_manifest: FileRefsManifest,
178 _permit: OwnedSemaphorePermit,
179 pub full_file_listing: bool,
188}
189
190pub struct ManifestOpenConfig {
191 pub compress_manifest: bool,
192 pub manifest_checkpoint_distance: u64,
193 pub experimental_manifest_keep_removed_file_count: usize,
194 pub experimental_manifest_keep_removed_file_ttl: Duration,
195}
196
197impl From<MitoConfig> for ManifestOpenConfig {
198 fn from(mito_config: MitoConfig) -> Self {
199 Self {
200 compress_manifest: mito_config.compress_manifest,
201 manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
202 experimental_manifest_keep_removed_file_count: mito_config
203 .experimental_manifest_keep_removed_file_count,
204 experimental_manifest_keep_removed_file_ttl: mito_config
205 .experimental_manifest_keep_removed_file_ttl,
206 }
207 }
208}
209
210impl LocalGcWorker {
211 #[allow(clippy::too_many_arguments)]
215 pub async fn try_new(
216 access_layer: AccessLayerRef,
217 cache_manager: Option<CacheManagerRef>,
218 regions_to_gc: BTreeMap<RegionId, Option<MitoRegionRef>>,
219 opt: GcConfig,
220 file_ref_manifest: FileRefsManifest,
221 limiter: &GcLimiterRef,
222 full_file_listing: bool,
223 ) -> Result<Self> {
224 if let Some(first_region_id) = regions_to_gc.keys().next() {
225 let table_id = first_region_id.table_id();
226 for region_id in regions_to_gc.keys() {
227 ensure!(
228 region_id.table_id() == table_id,
229 InvalidRequestSnafu {
230 region_id: *region_id,
231 reason: format!(
232 "Region {} does not belong to table {}",
233 region_id, table_id
234 ),
235 }
236 );
237 }
238 }
239
240 let permit = limiter.permit()?;
241
242 Ok(Self {
243 access_layer,
244 cache_manager,
245 regions: regions_to_gc,
246 opt,
247 file_ref_manifest,
248 _permit: permit,
249 full_file_listing,
250 })
251 }
252
253 pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileRef>>> {
255 let mut tmp_ref_files = HashMap::new();
256 for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
257 tmp_ref_files
258 .entry(*region_id)
259 .or_insert_with(HashSet::new)
260 .extend(file_refs.clone());
261 }
263
264 Ok(tmp_ref_files)
265 }
266
267 #[common_telemetry::tracing::instrument(
273 skip_all,
274 fields(region_count = self.regions.len(), full_file_listing = self.full_file_listing)
275 )]
276 pub async fn run(self) -> Result<GcReport> {
277 info!("LocalGcWorker started");
278 let _timer = GC_DURATION_SECONDS
279 .with_label_values(&["total"])
280 .start_timer();
281 let now = std::time::Instant::now();
282
283 let mut deleted_files = HashMap::new();
284 let mut deleted_indexes = HashMap::new();
285 let mut processed_regions = HashSet::new();
286 let tmp_ref_files = self.read_tmp_ref_files().await?;
287 for (region_id, region) in &self.regions {
288 let per_region_time = std::time::Instant::now();
289 if region.as_ref().map(|r| r.manifest_ctx.current_state())
290 == Some(RegionRoleState::Follower)
291 {
292 return UnexpectedSnafu {
293 reason: format!(
294 "Region {} is in Follower state, should not run GC on follower regions",
295 region_id
296 ),
297 }
298 .fail();
299 }
300 let tmp_ref_files = tmp_ref_files
301 .get(region_id)
302 .cloned()
303 .unwrap_or_else(HashSet::new);
304 let files = self
305 .do_region_gc(*region_id, region.clone(), &tmp_ref_files)
306 .await?;
307 let index_files = files
308 .iter()
309 .filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
310 .collect_vec();
311 deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
312 deleted_indexes.insert(*region_id, index_files);
313 processed_regions.insert(*region_id);
314 debug!(
315 "GC for region {} took {} secs.",
316 region_id,
317 per_region_time.elapsed().as_secs_f32()
318 );
319 }
320 info!(
321 "LocalGcWorker finished after {} secs.",
322 now.elapsed().as_secs_f32()
323 );
324 let report = GcReport {
325 deleted_files,
326 deleted_indexes,
327 need_retry_regions: HashSet::new(),
328 processed_regions,
329 };
330 Ok(report)
331 }
332}
333
334impl LocalGcWorker {
335 pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
338
339 #[common_telemetry::tracing::instrument(
348 skip_all,
349 fields(
350 region_id = %region_id,
351 full_file_listing = self.full_file_listing,
352 region_present = region.is_some()
353 )
354 )]
355 pub async fn do_region_gc(
356 &self,
357 region_id: RegionId,
358 region: Option<MitoRegionRef>,
359 tmp_ref_files: &HashSet<FileRef>,
360 ) -> Result<Vec<RemovedFile>> {
361 let mode = if self.full_file_listing {
362 "full_listing"
363 } else {
364 "fast"
365 };
366 GC_RUNS_TOTAL.with_label_values(&[mode]).inc();
367 debug!(
368 "Doing gc for region {}, {}",
369 region_id,
370 if region.is_some() {
371 "region found"
372 } else {
373 "region not found, might be dropped"
374 }
375 );
376
377 ensure!(
378 region.is_some() || self.full_file_listing,
379 InvalidRequestSnafu {
380 region_id,
381 reason: "region not found and full_file_listing is false; refusing GC without full listing".to_string(),
382 }
383 );
384
385 let manifest = if let Some(region) = ®ion {
386 let manifest = region.manifest_ctx.manifest().await;
387 let file_ref_manifest_version = self
389 .file_ref_manifest
390 .manifest_version
391 .get(®ion.region_id())
392 .cloned();
393 if file_ref_manifest_version != Some(manifest.manifest_version) {
394 warn!(
396 "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
397 file_ref_manifest_version,
398 region.region_id(),
399 manifest.manifest_version
400 );
401 GC_ERRORS_TOTAL
402 .with_label_values(&["manifest_mismatch"])
403 .inc();
404 return Ok(vec![]);
405 }
406 Some(manifest)
407 } else {
408 None
409 };
410
411 let all_entries = if let Some(manifest) = &manifest
412 && self.full_file_listing
413 {
414 self.list_from_object_store(region_id, manifest.files.len())
417 .await?
418 } else if manifest.is_none() && self.full_file_listing {
419 self.list_from_object_store(region_id, Self::CONCURRENCY_LIST_PER_FILES)
423 .await?
424 } else {
425 vec![]
426 };
427
428 let recently_removed_files = if let Some(manifest) = &manifest {
429 self.get_removed_files_expel_times(manifest).await?
430 } else {
431 Default::default()
432 };
433
434 if recently_removed_files.is_empty() {
435 debug!("No recently removed files to gc for region {}", region_id);
437 }
438
439 let removed_file_cnt = recently_removed_files
440 .values()
441 .map(|s| s.len())
442 .sum::<usize>();
443
444 let current_files = manifest.as_ref().map(|m| &m.files);
445
446 let in_manifest = if let Some(current_files) = current_files {
447 current_files
448 .iter()
449 .map(|(file_id, meta)| (*file_id, meta.index_version()))
450 .collect::<HashMap<_, _>>()
451 } else {
452 Default::default()
453 };
454
455 let is_region_dropped = region.is_none();
456
457 let in_tmp_ref = tmp_ref_files
458 .iter()
459 .map(|file_ref| (file_ref.file_id, file_ref.index_version))
460 .collect::<HashSet<_>>();
461
462 let deletable_files = self
463 .list_to_be_deleted_files(
464 region_id,
465 is_region_dropped,
466 &in_manifest,
467 &in_tmp_ref,
468 recently_removed_files,
469 all_entries,
470 )
471 .await?;
472
473 let unused_file_cnt = deletable_files.len();
474
475 info!(
476 "gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete count: {}",
477 if region.is_none() {
478 "(region dropped)"
479 } else {
480 ""
481 },
482 current_files.map(|c| c.len()).unwrap_or(0),
483 tmp_ref_files.len(),
484 removed_file_cnt,
485 deletable_files.len(),
486 );
487 debug!(
488 "gc: deletable files for region {}: {:?}",
489 region_id, &deletable_files
490 );
491
492 debug!(
493 "Found {} unused index files to delete for region {}",
494 deletable_files.len(),
495 region_id
496 );
497
498 let _delete_timer = GC_DURATION_SECONDS
499 .with_label_values(&["delete_files"])
500 .start_timer();
501 self.delete_files(region_id, &deletable_files).await?;
502
503 debug!(
504 "Successfully deleted {} unused files for region {}",
505 unused_file_cnt, region_id
506 );
507 if let Some(region) = ®ion {
508 let _update_timer = GC_DURATION_SECONDS
509 .with_label_values(&["update_manifest"])
510 .start_timer();
511 self.update_manifest_removed_files(region, deletable_files.clone())
512 .await?;
513 }
514
515 Ok(deletable_files)
516 }
517
518 #[common_telemetry::tracing::instrument(
519 skip_all,
520 fields(region_id = %region_id, removed_file_count = removed_files.len())
521 )]
522 async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
523 let mut index_ids = vec![];
524 let file_pairs = removed_files
525 .iter()
526 .filter_map(|f| match f {
527 RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))),
528 RemovedFile::Index(file_id, index_version) => {
529 let region_index_id =
530 RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version);
531 index_ids.push(region_index_id);
532 None
533 }
534 })
535 .collect_vec();
536 delete_files(
537 region_id,
538 &file_pairs,
539 true,
540 &self.access_layer,
541 &self.cache_manager,
542 )
543 .await?;
544
545 if !file_pairs.is_empty() {
546 let deleted_count = file_pairs.len() as u64;
547 GC_FILES_DELETED_TOTAL
548 .with_label_values(&["parquet"])
549 .inc_by(deleted_count);
550 GC_DELETE_FILE_CNT.inc_by(deleted_count);
551 }
552
553 if !index_ids.is_empty() {
554 let deleted_count = index_ids.len() as u64;
555 delete_indexes(&index_ids, &self.access_layer, &self.cache_manager)
556 .await
557 .inspect_err(|_| {
558 GC_ERRORS_TOTAL.with_label_values(&["delete_failed"]).inc();
559 })?;
560 GC_FILES_DELETED_TOTAL
561 .with_label_values(&["index"])
562 .inc_by(deleted_count);
563 GC_DELETE_FILE_CNT.inc_by(deleted_count);
564 }
565
566 Ok(())
567 }
568
569 #[common_telemetry::tracing::instrument(
571 skip_all,
572 fields(region_id = %region.region_id(), deleted_file_count = deleted_files.len())
573 )]
574 async fn update_manifest_removed_files(
575 &self,
576 region: &MitoRegionRef,
577 deleted_files: Vec<RemovedFile>,
578 ) -> Result<()> {
579 let deleted_file_cnt = deleted_files.len();
580 debug!(
581 "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
582 region.region_id()
583 );
584
585 let mut manager = region.manifest_ctx.manifest_manager.write().await;
586 let cnt = deleted_files.len();
587 manager.clear_deleted_files(deleted_files);
588 debug!(
589 "Updated region_id={} region manifest to clear {cnt} deleted files",
590 region.region_id(),
591 );
592
593 Ok(())
594 }
595
596 pub async fn get_removed_files_expel_times(
601 &self,
602 region_manifest: &Arc<RegionManifest>,
603 ) -> Result<BTreeMap<Timestamp, HashSet<RemovedFile>>> {
604 let mut ret = BTreeMap::new();
605 for files in ®ion_manifest.removed_files.removed_files {
606 let expel_time = Timestamp::new_millisecond(files.removed_at);
607 let set = ret.entry(expel_time).or_insert_with(HashSet::new);
608 set.extend(files.files.iter().cloned());
609 }
610
611 Ok(ret)
612 }
613
614 async fn partition_region_files(
617 &self,
618 region_id: RegionId,
619 concurrency: usize,
620 ) -> Result<Vec<(Lister, Option<String>)>> {
621 let region_dir = self.access_layer.build_region_dir(region_id);
622
623 let partitions = gen_partition_from_concurrency(concurrency);
624 let bounds = vec![None]
625 .into_iter()
626 .chain(partitions.iter().map(|p| Some(p.clone())))
627 .chain(vec![None])
628 .collect::<Vec<_>>();
629
630 let mut listers = vec![];
631 for part in bounds.windows(2) {
632 let start = part[0].clone();
633 let end = part[1].clone();
634 let mut lister = self.access_layer.object_store().lister_with(®ion_dir);
635 if let Some(s) = start {
636 lister = lister.start_after(&s);
637 }
638
639 let lister = lister.await.context(OpenDalSnafu)?;
640 listers.push((lister, end));
641 }
642
643 Ok(listers)
644 }
645
646 #[common_telemetry::tracing::instrument(
650 skip_all,
651 fields(region_id = %region_id, file_cnt_hint = file_cnt_hint)
652 )]
653 async fn list_from_object_store(
654 &self,
655 region_id: RegionId,
656 file_cnt_hint: usize,
657 ) -> Result<Vec<Entry>> {
658 let _timer = GC_DURATION_SECONDS
659 .with_label_values(&["list_files"])
660 .start_timer();
661 let start = tokio::time::Instant::now();
662 let concurrency = (file_cnt_hint / Self::CONCURRENCY_LIST_PER_FILES)
663 .max(1)
664 .min(self.opt.max_concurrent_lister_per_gc_job);
665
666 let listers = self
667 .partition_region_files(region_id, concurrency)
668 .await
669 .inspect_err(|_| {
670 GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
671 })?;
672 let lister_cnt = listers.len();
673
674 let all_entries = self
676 .list_region_files_concurrent(listers)
677 .await
678 .inspect_err(|_| {
679 GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
680 })?;
681 let cnt = all_entries.len();
682 info!(
683 "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
684 start.elapsed().as_secs_f64(),
685 region_id
686 );
687 Ok(all_entries)
688 }
689
690 async fn list_region_files_concurrent(
693 &self,
694 listers: Vec<(Lister, Option<String>)>,
695 ) -> Result<Vec<Entry>> {
696 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
697 let mut handles = vec![];
698
699 for (lister, end) in listers {
700 let tx = tx.clone();
701 let handle = tokio::spawn(
702 async move {
703 let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
704 Ok(e) => {
705 if let Some(end) = &end {
706 e.name() < end.as_str()
708 } else {
709 true
711 }
712 }
713 Err(err) => {
715 warn!("Failed to list entry: {}", err);
716 true
717 }
718 });
719 let stream = stream
720 .filter(|e| {
721 if let Ok(e) = &e {
722 e.metadata().is_file()
724 } else {
725 true
727 }
728 })
729 .collect::<Vec<_>>()
730 .await;
731 tx.send(stream).await.expect("Failed to send entries");
733 }
734 .instrument(common_telemetry::tracing::info_span!("gc_list_partition")),
735 );
736
737 handles.push(handle);
738 }
739
740 for handle in handles {
742 handle.await.context(JoinSnafu)?;
743 }
744
745 drop(tx); let mut all_entries = vec![];
749 while let Some(stream) = rx.recv().await {
750 all_entries.extend(stream.into_iter().filter_map(Result::ok));
751 }
752
753 Ok(all_entries)
754 }
755
756 #[allow(clippy::too_many_arguments)]
757 fn filter_deletable_files(
758 &self,
759 is_region_dropped: bool,
760 entries: Vec<Entry>,
761 in_manifest: &HashMap<FileId, Option<IndexVersion>>,
762 in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
763 may_linger_files: &HashSet<&RemovedFile>,
764 eligible_for_delete: &HashSet<&RemovedFile>,
765 unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
766 ) -> Vec<RemovedFile> {
767 let mut ready_for_delete = vec![];
768 let in_tmp_ref: HashMap<FileId, HashSet<IndexVersion>> =
770 in_tmp_ref
771 .iter()
772 .fold(HashMap::new(), |mut acc, (file, version)| {
773 let indices = acc.entry(*file).or_default();
774 if let Some(version) = version {
775 indices.insert(*version);
776 }
777 acc
778 });
779
780 let may_linger_files: HashMap<FileId, HashSet<&RemovedFile>> = may_linger_files
781 .iter()
782 .fold(HashMap::new(), |mut acc, file| {
783 let indices = acc.entry(file.file_id()).or_default();
784 indices.insert(file);
785 acc
786 });
787
788 let eligible_for_delete: HashMap<FileId, HashSet<&RemovedFile>> = eligible_for_delete
789 .iter()
790 .fold(HashMap::new(), |mut acc, file| {
791 let indices = acc.entry(file.file_id()).or_default();
792 indices.insert(file);
793 acc
794 });
795
796 for entry in entries {
797 let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) {
798 Ok((file_id, file_type)) => (file_id, file_type),
799 Err(err) => {
800 error!(err; "Failed to parse file id from path: {}", entry.name());
801 GC_SKIPPED_UNPARSABLE_FILES.inc();
804 continue;
805 }
806 };
807
808 let should_delete = match file_type {
809 FileType::Parquet => {
810 let is_in_manifest = in_manifest.contains_key(&file_id);
811 let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id);
812 let is_linger = may_linger_files.contains_key(&file_id);
813 let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id);
814
815 should_delete_file(
816 is_in_manifest,
817 is_in_tmp_ref,
818 is_linger,
819 is_eligible_for_delete,
820 is_region_dropped,
821 &entry,
822 unknown_file_may_linger_until,
823 )
824 }
825 FileType::Puffin(version) => {
826 let is_in_manifest = in_manifest
828 .get(&file_id)
829 .map(|opt_ver| *opt_ver == Some(version))
830 .unwrap_or(false);
831 let is_in_tmp_ref = in_tmp_ref
832 .get(&file_id)
833 .map(|versions| versions.contains(&version))
834 .unwrap_or(false);
835 let is_linger = may_linger_files
836 .get(&file_id)
837 .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
838 .unwrap_or(false);
839 let is_eligible_for_delete = eligible_for_delete
840 .get(&file_id)
841 .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
842 .unwrap_or(false);
843
844 should_delete_file(
845 is_in_manifest,
846 is_in_tmp_ref,
847 is_linger,
848 is_eligible_for_delete,
849 is_region_dropped,
850 &entry,
851 unknown_file_may_linger_until,
852 )
853 }
854 };
855
856 if should_delete {
857 let removed_file = match file_type {
858 FileType::Parquet => {
859 RemovedFile::File(file_id, None)
862 }
863 FileType::Puffin(version) => {
864 GC_ORPHANED_INDEX_FILES.inc();
865 RemovedFile::Index(file_id, version)
866 }
867 };
868 ready_for_delete.push(removed_file);
869 }
870 }
871 ready_for_delete
872 }
873
874 pub async fn list_to_be_deleted_files(
883 &self,
884 region_id: RegionId,
885 is_region_dropped: bool,
886 in_manifest: &HashMap<FileId, Option<IndexVersion>>,
887 in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
888 recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
889 all_entries: Vec<Entry>,
890 ) -> Result<Vec<RemovedFile>> {
891 let now = chrono::Utc::now();
892 let may_linger_until = self
893 .opt
894 .lingering_time
895 .map(|lingering_time| {
896 chrono::Duration::from_std(lingering_time)
897 .with_context(|_| DurationOutOfRangeSnafu {
898 input: lingering_time,
899 })
900 .map(|t| now - t)
901 })
902 .transpose()?;
903
904 let unknown_file_may_linger_until = now
905 - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
906 |_| DurationOutOfRangeSnafu {
907 input: self.opt.unknown_file_lingering_time,
908 },
909 )?;
910
911 let threshold =
913 may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
914 let mut recently_removed_files = recently_removed_files;
917 let may_linger_files = match threshold {
918 Some(threshold) => recently_removed_files.split_off(&threshold),
919 None => BTreeMap::new(),
920 };
921 debug!("may_linger_files: {:?}", may_linger_files);
922
923 let all_may_linger_files = may_linger_files.values().flatten().collect::<HashSet<_>>();
924
925 let eligible_for_removal = recently_removed_files
928 .values()
929 .flatten()
930 .collect::<HashSet<_>>();
931
932 if !self.full_file_listing {
935 let files_to_delete: Vec<RemovedFile> = eligible_for_removal
940 .iter()
941 .filter(|file_id| {
942 let in_use = match file_id {
943 RemovedFile::File(file_id, index_version) => {
944 in_manifest.get(file_id) == Some(index_version)
945 || in_tmp_ref.contains(&(*file_id, *index_version))
946 }
947 RemovedFile::Index(file_id, index_version) => {
948 in_manifest.get(file_id) == Some(&Some(*index_version))
949 || in_tmp_ref.contains(&(*file_id, Some(*index_version)))
950 }
951 };
952 !in_use
953 })
954 .map(|&f| f.clone())
955 .collect();
956
957 info!(
958 "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
959 region_id,
960 files_to_delete.len()
961 );
962
963 return Ok(files_to_delete);
964 }
965
966 let all_unused_files_ready_for_delete = self.filter_deletable_files(
970 is_region_dropped,
971 all_entries,
972 in_manifest,
973 in_tmp_ref,
974 &all_may_linger_files,
975 &eligible_for_removal,
976 unknown_file_may_linger_until,
977 );
978
979 Ok(all_unused_files_ready_for_delete)
980 }
981}
982
983fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
993 let n = concurrency.next_power_of_two();
994 if n <= 1 {
995 return vec![];
996 }
997
998 let mut d = 0;
1002 let mut p: u128 = 1;
1003 while p < n as u128 {
1004 p *= 16;
1005 d += 1;
1006 }
1007
1008 let total_space = p;
1009 let step = total_space / n as u128;
1010
1011 (1..n)
1012 .map(|i| {
1013 let boundary = i as u128 * step;
1014 format!("{:0width$x}", boundary, width = d)
1015 })
1016 .collect()
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::*;
1022
1023 #[test]
1024 fn test_gen_partition_from_concurrency() {
1025 let partitions = gen_partition_from_concurrency(1);
1026 assert!(partitions.is_empty());
1027
1028 let partitions = gen_partition_from_concurrency(2);
1029 assert_eq!(partitions, vec!["8"]);
1030
1031 let partitions = gen_partition_from_concurrency(3);
1032 assert_eq!(partitions, vec!["4", "8", "c"]);
1033
1034 let partitions = gen_partition_from_concurrency(4);
1035 assert_eq!(partitions, vec!["4", "8", "c"]);
1036
1037 let partitions = gen_partition_from_concurrency(8);
1038 assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
1039
1040 let partitions = gen_partition_from_concurrency(16);
1041 assert_eq!(
1042 partitions,
1043 vec![
1044 "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
1045 ]
1046 );
1047
1048 let partitions = gen_partition_from_concurrency(32);
1049 assert_eq!(
1050 partitions,
1051 [
1052 "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
1053 "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
1054 "e8", "f0", "f8",
1055 ]
1056 );
1057 }
1058}