1use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::{debug, error, info, warn};
30use common_time::Timestamp;
31use itertools::Itertools;
32use object_store::{Entry, Lister};
33use serde::{Deserialize, Serialize};
34use snafu::{ResultExt as _, ensure};
35use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
36use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
37use tokio_stream::StreamExt;
38
39use crate::access_layer::AccessLayerRef;
40use crate::cache::CacheManagerRef;
41use crate::cache::file_cache::FileType;
42use crate::config::MitoConfig;
43use crate::error::{
44 DurationOutOfRangeSnafu, InvalidRequestSnafu, JoinSnafu, OpenDalSnafu, Result,
45 TooManyGcJobsSnafu, UnexpectedSnafu,
46};
47use crate::manifest::action::{RegionManifest, RemovedFile};
48use crate::metrics::{GC_DELETE_FILE_CNT, GC_ORPHANED_INDEX_FILES, GC_SKIPPED_UNPARSABLE_FILES};
49use crate::region::{MitoRegionRef, RegionRoleState};
50use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_index};
51use crate::sst::location::{self};
52
53#[cfg(test)]
54mod worker_test;
55
56fn should_delete_file(
59 is_in_manifest: bool,
60 is_in_tmp_ref: bool,
61 is_linger: bool,
62 is_eligible_for_delete: bool,
63 is_region_dropped: bool,
64 _entry: &Entry,
65 _unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
66) -> bool {
67 let is_known = is_linger || is_eligible_for_delete;
68
69 !is_in_manifest
70 && !is_in_tmp_ref
71 && if is_known {
72 is_eligible_for_delete
73 } else {
74 !is_in_tmp_ref && is_region_dropped
75 }
76}
77
78pub struct GcLimiter {
80 pub gc_job_limit: Arc<tokio::sync::Semaphore>,
81 gc_concurrency: usize,
82}
83
84pub type GcLimiterRef = Arc<GcLimiter>;
85
86impl GcLimiter {
87 pub fn new(gc_concurrency: usize) -> Self {
88 Self {
89 gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
90 gc_concurrency,
91 }
92 }
93
94 pub fn running_gc_tasks(&self) -> u32 {
95 (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
96 }
97
98 pub fn gc_concurrency(&self) -> u32 {
99 self.gc_concurrency as u32
100 }
101
102 pub fn gc_stat(&self) -> GcStat {
103 GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
104 }
105
106 pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
110 self.gc_job_limit
111 .clone()
112 .try_acquire_owned()
113 .map_err(|e| match e {
114 TryAcquireError::Closed => UnexpectedSnafu {
115 reason: format!("Failed to acquire gc permit: {e}"),
116 }
117 .build(),
118 TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
119 })
120 }
121}
122
123#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
124#[serde(default)]
125pub struct GcConfig {
126 pub enable: bool,
128 #[serde(with = "humantime_serde")]
133 pub lingering_time: Option<Duration>,
134 #[serde(with = "humantime_serde")]
139 pub unknown_file_lingering_time: Duration,
140 pub max_concurrent_lister_per_gc_job: usize,
143 pub max_concurrent_gc_job: usize,
147}
148
149impl Default for GcConfig {
150 fn default() -> Self {
151 Self {
152 enable: false,
153 lingering_time: Some(Duration::from_secs(60)),
155 unknown_file_lingering_time: Duration::from_secs(60 * 60),
157 max_concurrent_lister_per_gc_job: 32,
158 max_concurrent_gc_job: 4,
159 }
160 }
161}
162
163pub struct LocalGcWorker {
164 pub(crate) access_layer: AccessLayerRef,
165 pub(crate) cache_manager: Option<CacheManagerRef>,
166 pub(crate) regions: BTreeMap<RegionId, Option<MitoRegionRef>>,
167 pub(crate) opt: GcConfig,
169 pub(crate) file_ref_manifest: FileRefsManifest,
174 _permit: OwnedSemaphorePermit,
175 pub full_file_listing: bool,
184}
185
186pub struct ManifestOpenConfig {
187 pub compress_manifest: bool,
188 pub manifest_checkpoint_distance: u64,
189 pub experimental_manifest_keep_removed_file_count: usize,
190 pub experimental_manifest_keep_removed_file_ttl: Duration,
191}
192
193impl From<MitoConfig> for ManifestOpenConfig {
194 fn from(mito_config: MitoConfig) -> Self {
195 Self {
196 compress_manifest: mito_config.compress_manifest,
197 manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
198 experimental_manifest_keep_removed_file_count: mito_config
199 .experimental_manifest_keep_removed_file_count,
200 experimental_manifest_keep_removed_file_ttl: mito_config
201 .experimental_manifest_keep_removed_file_ttl,
202 }
203 }
204}
205
206impl LocalGcWorker {
207 #[allow(clippy::too_many_arguments)]
211 pub async fn try_new(
212 access_layer: AccessLayerRef,
213 cache_manager: Option<CacheManagerRef>,
214 regions_to_gc: BTreeMap<RegionId, Option<MitoRegionRef>>,
215 opt: GcConfig,
216 file_ref_manifest: FileRefsManifest,
217 limiter: &GcLimiterRef,
218 full_file_listing: bool,
219 ) -> Result<Self> {
220 if let Some(first_region_id) = regions_to_gc.keys().next() {
221 let table_id = first_region_id.table_id();
222 for region_id in regions_to_gc.keys() {
223 ensure!(
224 region_id.table_id() == table_id,
225 InvalidRequestSnafu {
226 region_id: *region_id,
227 reason: format!(
228 "Region {} does not belong to table {}",
229 region_id, table_id
230 ),
231 }
232 );
233 }
234 }
235
236 let permit = limiter.permit()?;
237
238 Ok(Self {
239 access_layer,
240 cache_manager,
241 regions: regions_to_gc,
242 opt,
243 file_ref_manifest,
244 _permit: permit,
245 full_file_listing,
246 })
247 }
248
249 pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileRef>>> {
251 let mut tmp_ref_files = HashMap::new();
252 for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
253 tmp_ref_files
254 .entry(*region_id)
255 .or_insert_with(HashSet::new)
256 .extend(file_refs.clone());
257 }
259
260 Ok(tmp_ref_files)
261 }
262
263 pub async fn run(self) -> Result<GcReport> {
269 info!("LocalGcWorker started");
270 let now = std::time::Instant::now();
271
272 let mut deleted_files = HashMap::new();
273 let mut deleted_indexes = HashMap::new();
274 let tmp_ref_files = self.read_tmp_ref_files().await?;
275 for (region_id, region) in &self.regions {
276 let per_region_time = std::time::Instant::now();
277 if region.as_ref().map(|r| r.manifest_ctx.current_state())
278 == Some(RegionRoleState::Follower)
279 {
280 return UnexpectedSnafu {
281 reason: format!(
282 "Region {} is in Follower state, should not run GC on follower regions",
283 region_id
284 ),
285 }
286 .fail();
287 }
288 let tmp_ref_files = tmp_ref_files
289 .get(region_id)
290 .cloned()
291 .unwrap_or_else(HashSet::new);
292 let files = self
293 .do_region_gc(*region_id, region.clone(), &tmp_ref_files)
294 .await?;
295 let index_files = files
296 .iter()
297 .filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
298 .collect_vec();
299 deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
300 deleted_indexes.insert(*region_id, index_files);
301 debug!(
302 "GC for region {} took {} secs.",
303 region_id,
304 per_region_time.elapsed().as_secs_f32()
305 );
306 }
307 info!(
308 "LocalGcWorker finished after {} secs.",
309 now.elapsed().as_secs_f32()
310 );
311 let report = GcReport {
312 deleted_files,
313 deleted_indexes,
314 need_retry_regions: HashSet::new(),
315 };
316 Ok(report)
317 }
318}
319
320impl LocalGcWorker {
321 pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
324
325 pub async fn do_region_gc(
334 &self,
335 region_id: RegionId,
336 region: Option<MitoRegionRef>,
337 tmp_ref_files: &HashSet<FileRef>,
338 ) -> Result<Vec<RemovedFile>> {
339 debug!(
340 "Doing gc for region {}, {}",
341 region_id,
342 if region.is_some() {
343 "region found"
344 } else {
345 "region not found, might be dropped"
346 }
347 );
348
349 ensure!(
350 region.is_some() || self.full_file_listing,
351 InvalidRequestSnafu {
352 region_id,
353 reason: "region not found and full_file_listing is false; refusing GC without full listing".to_string(),
354 }
355 );
356
357 let manifest = if let Some(region) = ®ion {
358 let manifest = region.manifest_ctx.manifest().await;
359 let file_ref_manifest_version = self
361 .file_ref_manifest
362 .manifest_version
363 .get(®ion.region_id())
364 .cloned();
365 if file_ref_manifest_version != Some(manifest.manifest_version) {
366 warn!(
368 "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
369 file_ref_manifest_version,
370 region.region_id(),
371 manifest.manifest_version
372 );
373 return Ok(vec![]);
374 }
375 Some(manifest)
376 } else {
377 None
378 };
379
380 let all_entries = if let Some(manifest) = &manifest
381 && self.full_file_listing
382 {
383 self.list_from_object_store(region_id, manifest.files.len())
386 .await?
387 } else if manifest.is_none() && self.full_file_listing {
388 self.list_from_object_store(region_id, Self::CONCURRENCY_LIST_PER_FILES)
392 .await?
393 } else {
394 vec![]
395 };
396
397 let recently_removed_files = if let Some(manifest) = &manifest {
398 self.get_removed_files_expel_times(manifest).await?
399 } else {
400 Default::default()
401 };
402
403 if recently_removed_files.is_empty() {
404 debug!("No recently removed files to gc for region {}", region_id);
406 }
407
408 let removed_file_cnt = recently_removed_files
409 .values()
410 .map(|s| s.len())
411 .sum::<usize>();
412
413 let current_files = manifest.as_ref().map(|m| &m.files);
414
415 let in_manifest = if let Some(current_files) = current_files {
416 current_files
417 .iter()
418 .map(|(file_id, meta)| (*file_id, meta.index_version()))
419 .collect::<HashMap<_, _>>()
420 } else {
421 Default::default()
422 };
423
424 let is_region_dropped = region.is_none();
425
426 let in_tmp_ref = tmp_ref_files
427 .iter()
428 .map(|file_ref| (file_ref.file_id, file_ref.index_version))
429 .collect::<HashSet<_>>();
430
431 let deletable_files = self
432 .list_to_be_deleted_files(
433 region_id,
434 is_region_dropped,
435 &in_manifest,
436 &in_tmp_ref,
437 recently_removed_files,
438 all_entries,
439 )
440 .await?;
441
442 let unused_file_cnt = deletable_files.len();
443
444 info!(
445 "gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {:?}",
446 if region.is_none() {
447 "(region dropped)"
448 } else {
449 ""
450 },
451 current_files.map(|c| c.len()).unwrap_or(0),
452 tmp_ref_files.len(),
453 removed_file_cnt,
454 &deletable_files,
455 );
456
457 debug!(
458 "Found {} unused index files to delete for region {}",
459 deletable_files.len(),
460 region_id
461 );
462
463 self.delete_files(region_id, &deletable_files).await?;
464
465 debug!(
466 "Successfully deleted {} unused files for region {}",
467 unused_file_cnt, region_id
468 );
469 if let Some(region) = ®ion {
470 self.update_manifest_removed_files(region, deletable_files.clone())
471 .await?;
472 }
473
474 Ok(deletable_files)
475 }
476
477 async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
478 let mut index_ids = vec![];
479 let file_pairs = removed_files
480 .iter()
481 .filter_map(|f| match f {
482 RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))),
483 RemovedFile::Index(file_id, index_version) => {
484 let region_index_id =
485 RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version);
486 index_ids.push(region_index_id);
487 None
488 }
489 })
490 .collect_vec();
491 delete_files(
492 region_id,
493 &file_pairs,
494 true,
495 &self.access_layer,
496 &self.cache_manager,
497 )
498 .await?;
499
500 for index_id in index_ids {
501 delete_index(index_id, &self.access_layer, &self.cache_manager).await?;
502 }
503
504 GC_DELETE_FILE_CNT.add(removed_files.len() as i64);
506
507 Ok(())
508 }
509
510 async fn update_manifest_removed_files(
512 &self,
513 region: &MitoRegionRef,
514 deleted_files: Vec<RemovedFile>,
515 ) -> Result<()> {
516 let deleted_file_cnt = deleted_files.len();
517 debug!(
518 "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
519 region.region_id()
520 );
521
522 let mut manager = region.manifest_ctx.manifest_manager.write().await;
523 let cnt = deleted_files.len();
524 manager.clear_deleted_files(deleted_files);
525 debug!(
526 "Updated region_id={} region manifest to clear {cnt} deleted files",
527 region.region_id(),
528 );
529
530 Ok(())
531 }
532
533 pub async fn get_removed_files_expel_times(
538 &self,
539 region_manifest: &Arc<RegionManifest>,
540 ) -> Result<BTreeMap<Timestamp, HashSet<RemovedFile>>> {
541 let mut ret = BTreeMap::new();
542 for files in ®ion_manifest.removed_files.removed_files {
543 let expel_time = Timestamp::new_millisecond(files.removed_at);
544 let set = ret.entry(expel_time).or_insert_with(HashSet::new);
545 set.extend(files.files.iter().cloned());
546 }
547
548 Ok(ret)
549 }
550
551 async fn partition_region_files(
554 &self,
555 region_id: RegionId,
556 concurrency: usize,
557 ) -> Result<Vec<(Lister, Option<String>)>> {
558 let region_dir = self.access_layer.build_region_dir(region_id);
559
560 let partitions = gen_partition_from_concurrency(concurrency);
561 let bounds = vec![None]
562 .into_iter()
563 .chain(partitions.iter().map(|p| Some(p.clone())))
564 .chain(vec![None])
565 .collect::<Vec<_>>();
566
567 let mut listers = vec![];
568 for part in bounds.windows(2) {
569 let start = part[0].clone();
570 let end = part[1].clone();
571 let mut lister = self.access_layer.object_store().lister_with(®ion_dir);
572 if let Some(s) = start {
573 lister = lister.start_after(&s);
574 }
575
576 let lister = lister.await.context(OpenDalSnafu)?;
577 listers.push((lister, end));
578 }
579
580 Ok(listers)
581 }
582
583 async fn list_from_object_store(
587 &self,
588 region_id: RegionId,
589 file_cnt_hint: usize,
590 ) -> Result<Vec<Entry>> {
591 let start = tokio::time::Instant::now();
592 let concurrency = (file_cnt_hint / Self::CONCURRENCY_LIST_PER_FILES)
593 .max(1)
594 .min(self.opt.max_concurrent_lister_per_gc_job);
595
596 let listers = self.partition_region_files(region_id, concurrency).await?;
597 let lister_cnt = listers.len();
598
599 let all_entries = self.list_region_files_concurrent(listers).await?;
601 let cnt = all_entries.len();
602 info!(
603 "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
604 start.elapsed().as_secs_f64(),
605 region_id
606 );
607 Ok(all_entries)
608 }
609
610 async fn list_region_files_concurrent(
613 &self,
614 listers: Vec<(Lister, Option<String>)>,
615 ) -> Result<Vec<Entry>> {
616 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
617 let mut handles = vec![];
618
619 for (lister, end) in listers {
620 let tx = tx.clone();
621 let handle = tokio::spawn(async move {
622 let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
623 Ok(e) => {
624 if let Some(end) = &end {
625 e.name() < end.as_str()
627 } else {
628 true
630 }
631 }
632 Err(err) => {
634 warn!("Failed to list entry: {}", err);
635 true
636 }
637 });
638 let stream = stream
639 .filter(|e| {
640 if let Ok(e) = &e {
641 e.metadata().is_file()
643 } else {
644 true
646 }
647 })
648 .collect::<Vec<_>>()
649 .await;
650 tx.send(stream).await.expect("Failed to send entries");
652 });
653
654 handles.push(handle);
655 }
656
657 for handle in handles {
659 handle.await.context(JoinSnafu)?;
660 }
661
662 drop(tx); let mut all_entries = vec![];
666 while let Some(stream) = rx.recv().await {
667 all_entries.extend(stream.into_iter().filter_map(Result::ok));
668 }
669
670 Ok(all_entries)
671 }
672
673 #[allow(clippy::too_many_arguments)]
674 fn filter_deletable_files(
675 &self,
676 is_region_dropped: bool,
677 entries: Vec<Entry>,
678 in_manifest: &HashMap<FileId, Option<IndexVersion>>,
679 in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
680 may_linger_files: &HashSet<&RemovedFile>,
681 eligible_for_delete: &HashSet<&RemovedFile>,
682 unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
683 ) -> Vec<RemovedFile> {
684 let mut ready_for_delete = vec![];
685 let in_tmp_ref: HashMap<FileId, HashSet<IndexVersion>> =
687 in_tmp_ref
688 .iter()
689 .fold(HashMap::new(), |mut acc, (file, version)| {
690 let indices = acc.entry(*file).or_default();
691 if let Some(version) = version {
692 indices.insert(*version);
693 }
694 acc
695 });
696
697 let may_linger_files: HashMap<FileId, HashSet<&RemovedFile>> = may_linger_files
698 .iter()
699 .fold(HashMap::new(), |mut acc, file| {
700 let indices = acc.entry(file.file_id()).or_default();
701 indices.insert(file);
702 acc
703 });
704
705 let eligible_for_delete: HashMap<FileId, HashSet<&RemovedFile>> = eligible_for_delete
706 .iter()
707 .fold(HashMap::new(), |mut acc, file| {
708 let indices = acc.entry(file.file_id()).or_default();
709 indices.insert(file);
710 acc
711 });
712
713 for entry in entries {
714 let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) {
715 Ok((file_id, file_type)) => (file_id, file_type),
716 Err(err) => {
717 error!(err; "Failed to parse file id from path: {}", entry.name());
718 GC_SKIPPED_UNPARSABLE_FILES.inc();
721 continue;
722 }
723 };
724
725 let should_delete = match file_type {
726 FileType::Parquet => {
727 let is_in_manifest = in_manifest.contains_key(&file_id);
728 let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id);
729 let is_linger = may_linger_files.contains_key(&file_id);
730 let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id);
731
732 should_delete_file(
733 is_in_manifest,
734 is_in_tmp_ref,
735 is_linger,
736 is_eligible_for_delete,
737 is_region_dropped,
738 &entry,
739 unknown_file_may_linger_until,
740 )
741 }
742 FileType::Puffin(version) => {
743 let is_in_manifest = in_manifest
745 .get(&file_id)
746 .map(|opt_ver| *opt_ver == Some(version))
747 .unwrap_or(false);
748 let is_in_tmp_ref = in_tmp_ref
749 .get(&file_id)
750 .map(|versions| versions.contains(&version))
751 .unwrap_or(false);
752 let is_linger = may_linger_files
753 .get(&file_id)
754 .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
755 .unwrap_or(false);
756 let is_eligible_for_delete = eligible_for_delete
757 .get(&file_id)
758 .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
759 .unwrap_or(false);
760
761 should_delete_file(
762 is_in_manifest,
763 is_in_tmp_ref,
764 is_linger,
765 is_eligible_for_delete,
766 is_region_dropped,
767 &entry,
768 unknown_file_may_linger_until,
769 )
770 }
771 };
772
773 if should_delete {
774 let removed_file = match file_type {
775 FileType::Parquet => {
776 RemovedFile::File(file_id, None)
779 }
780 FileType::Puffin(version) => {
781 GC_ORPHANED_INDEX_FILES.inc();
782 RemovedFile::Index(file_id, version)
783 }
784 };
785 ready_for_delete.push(removed_file);
786 }
787 }
788 ready_for_delete
789 }
790
791 pub async fn list_to_be_deleted_files(
800 &self,
801 region_id: RegionId,
802 is_region_dropped: bool,
803 in_manifest: &HashMap<FileId, Option<IndexVersion>>,
804 in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
805 recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
806 all_entries: Vec<Entry>,
807 ) -> Result<Vec<RemovedFile>> {
808 let now = chrono::Utc::now();
809 let may_linger_until = self
810 .opt
811 .lingering_time
812 .map(|lingering_time| {
813 chrono::Duration::from_std(lingering_time)
814 .with_context(|_| DurationOutOfRangeSnafu {
815 input: lingering_time,
816 })
817 .map(|t| now - t)
818 })
819 .transpose()?;
820
821 let unknown_file_may_linger_until = now
822 - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
823 |_| DurationOutOfRangeSnafu {
824 input: self.opt.unknown_file_lingering_time,
825 },
826 )?;
827
828 let threshold =
830 may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
831 let mut recently_removed_files = recently_removed_files;
834 let may_linger_files = match threshold {
835 Some(threshold) => recently_removed_files.split_off(&threshold),
836 None => BTreeMap::new(),
837 };
838 debug!("may_linger_files: {:?}", may_linger_files);
839
840 let all_may_linger_files = may_linger_files.values().flatten().collect::<HashSet<_>>();
841
842 let eligible_for_removal = recently_removed_files
845 .values()
846 .flatten()
847 .collect::<HashSet<_>>();
848
849 if !self.full_file_listing {
852 let files_to_delete: Vec<RemovedFile> = eligible_for_removal
857 .iter()
858 .filter(|file_id| {
859 let in_use = match file_id {
860 RemovedFile::File(file_id, index_version) => {
861 in_manifest.get(file_id) == Some(index_version)
862 || in_tmp_ref.contains(&(*file_id, *index_version))
863 }
864 RemovedFile::Index(file_id, index_version) => {
865 in_manifest.get(file_id) == Some(&Some(*index_version))
866 || in_tmp_ref.contains(&(*file_id, Some(*index_version)))
867 }
868 };
869 !in_use
870 })
871 .map(|&f| f.clone())
872 .collect();
873
874 info!(
875 "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
876 region_id,
877 files_to_delete.len()
878 );
879
880 return Ok(files_to_delete);
881 }
882
883 let all_unused_files_ready_for_delete = self.filter_deletable_files(
887 is_region_dropped,
888 all_entries,
889 in_manifest,
890 in_tmp_ref,
891 &all_may_linger_files,
892 &eligible_for_removal,
893 unknown_file_may_linger_until,
894 );
895
896 Ok(all_unused_files_ready_for_delete)
897 }
898}
899
900fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
910 let n = concurrency.next_power_of_two();
911 if n <= 1 {
912 return vec![];
913 }
914
915 let mut d = 0;
919 let mut p: u128 = 1;
920 while p < n as u128 {
921 p *= 16;
922 d += 1;
923 }
924
925 let total_space = p;
926 let step = total_space / n as u128;
927
928 (1..n)
929 .map(|i| {
930 let boundary = i as u128 * step;
931 format!("{:0width$x}", boundary, width = d)
932 })
933 .collect()
934}
935
936#[cfg(test)]
937mod tests {
938 use super::*;
939
940 #[test]
941 fn test_gen_partition_from_concurrency() {
942 let partitions = gen_partition_from_concurrency(1);
943 assert!(partitions.is_empty());
944
945 let partitions = gen_partition_from_concurrency(2);
946 assert_eq!(partitions, vec!["8"]);
947
948 let partitions = gen_partition_from_concurrency(3);
949 assert_eq!(partitions, vec!["4", "8", "c"]);
950
951 let partitions = gen_partition_from_concurrency(4);
952 assert_eq!(partitions, vec!["4", "8", "c"]);
953
954 let partitions = gen_partition_from_concurrency(8);
955 assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
956
957 let partitions = gen_partition_from_concurrency(16);
958 assert_eq!(
959 partitions,
960 vec![
961 "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
962 ]
963 );
964
965 let partitions = gen_partition_from_concurrency(32);
966 assert_eq!(
967 partitions,
968 [
969 "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
970 "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
971 "e8", "f0", "f8",
972 ]
973 );
974 }
975}