mito2/
gc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! GC worker which periodically checks and removes unused/obsolete  SST files.
16//!
17//! `expel time`: the time when the file is considered as removed, as in removed from the manifest.
18//! `lingering time`: the time duration before deleting files after they are removed from manifest.
19//! `delta manifest`: the manifest files after the last checkpoint that contains the changes to the manifest.
20//! `delete time`: the time when the file is actually deleted from the object store.
21//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint.
22//!
23
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::{debug, error, info, warn};
30use common_time::Timestamp;
31use object_store::{Entry, Lister};
32use serde::{Deserialize, Serialize};
33use snafu::ResultExt as _;
34use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
35use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
36use tokio_stream::StreamExt;
37
38use crate::access_layer::AccessLayerRef;
39use crate::cache::CacheManagerRef;
40use crate::config::MitoConfig;
41use crate::error::{
42    DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu,
43};
44use crate::manifest::action::RegionManifest;
45use crate::metrics::GC_DELETE_FILE_CNT;
46use crate::region::{MitoRegionRef, RegionRoleState};
47use crate::sst::file::delete_files;
48use crate::sst::location::{self};
49
50#[cfg(test)]
51mod worker_test;
52
53/// Limit the amount of concurrent GC jobs on the datanode
54pub struct GcLimiter {
55    pub gc_job_limit: Arc<tokio::sync::Semaphore>,
56    gc_concurrency: usize,
57}
58
59pub type GcLimiterRef = Arc<GcLimiter>;
60
61impl GcLimiter {
62    pub fn new(gc_concurrency: usize) -> Self {
63        Self {
64            gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
65            gc_concurrency,
66        }
67    }
68
69    pub fn running_gc_tasks(&self) -> u32 {
70        (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
71    }
72
73    pub fn gc_concurrency(&self) -> u32 {
74        self.gc_concurrency as u32
75    }
76
77    pub fn gc_stat(&self) -> GcStat {
78        GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
79    }
80
81    /// Try to acquire a permit for a GC job.
82    ///
83    /// If no permit is available, returns an `TooManyGcJobs` error.
84    pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
85        self.gc_job_limit
86            .clone()
87            .try_acquire_owned()
88            .map_err(|e| match e {
89                TryAcquireError::Closed => UnexpectedSnafu {
90                    reason: format!("Failed to acquire gc permit: {e}"),
91                }
92                .build(),
93                TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
94            })
95    }
96}
97
98#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
99#[serde(default)]
100pub struct GcConfig {
101    /// Whether GC is enabled.
102    pub enable: bool,
103    /// Lingering time before deleting files.
104    /// Should be long enough to allow long running queries to finish.
105    /// If set to None, then unused files will be deleted immediately.
106    ///
107    /// TODO(discord9): long running queries should actively write tmp manifest files
108    /// to prevent deletion of files they are using.
109    #[serde(with = "humantime_serde")]
110    pub lingering_time: Option<Duration>,
111    /// Lingering time before deleting unknown files(files with undetermine expel time).
112    /// expel time is the time when the file is considered as removed, as in removed from the manifest.
113    /// This should only occur rarely, as manifest keep tracks in `removed_files` field
114    /// unless something goes wrong.
115    #[serde(with = "humantime_serde")]
116    pub unknown_file_lingering_time: Duration,
117    /// Maximum concurrent list operations per GC job.
118    /// This is used to limit the number of concurrent listing operations and speed up listing.
119    pub max_concurrent_lister_per_gc_job: usize,
120    /// Maximum concurrent GC jobs.
121    /// This is used to limit the number of concurrent GC jobs running on the datanode
122    /// to prevent too many concurrent GC jobs from overwhelming the datanode.
123    pub max_concurrent_gc_job: usize,
124}
125
126impl Default for GcConfig {
127    fn default() -> Self {
128        Self {
129            enable: false,
130            // expect long running queries to be finished(or at least be able to notify it's using a deleted file) within a reasonable time
131            lingering_time: Some(Duration::from_secs(60)),
132            // 1 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
133            unknown_file_lingering_time: Duration::from_secs(60 * 60),
134            max_concurrent_lister_per_gc_job: 32,
135            max_concurrent_gc_job: 4,
136        }
137    }
138}
139
140pub struct LocalGcWorker {
141    pub(crate) access_layer: AccessLayerRef,
142    pub(crate) cache_manager: Option<CacheManagerRef>,
143    pub(crate) regions: BTreeMap<RegionId, MitoRegionRef>,
144    /// Lingering time before deleting files.
145    pub(crate) opt: GcConfig,
146    /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
147    ///
148    /// Also contains manifest versions of regions when the tmp ref files are generated.
149    /// Used to determine whether the tmp ref files are outdated.
150    pub(crate) file_ref_manifest: FileRefsManifest,
151    _permit: OwnedSemaphorePermit,
152    /// Whether to perform full file listing during GC.
153    /// When set to false, GC will only delete files that are tracked in the manifest's removed_files,
154    /// which can significantly improve performance by avoiding expensive list operations.
155    /// When set to true, GC will perform a full listing to find and delete orphan files
156    /// (files not tracked in the manifest).
157    ///
158    /// Set to false for regular GC operations to optimize performance.
159    /// Set to true periodically or when you need to clean up orphan files.
160    pub full_file_listing: bool,
161}
162
163pub struct ManifestOpenConfig {
164    pub compress_manifest: bool,
165    pub manifest_checkpoint_distance: u64,
166    pub experimental_manifest_keep_removed_file_count: usize,
167    pub experimental_manifest_keep_removed_file_ttl: Duration,
168}
169
170impl From<MitoConfig> for ManifestOpenConfig {
171    fn from(mito_config: MitoConfig) -> Self {
172        Self {
173            compress_manifest: mito_config.compress_manifest,
174            manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
175            experimental_manifest_keep_removed_file_count: mito_config
176                .experimental_manifest_keep_removed_file_count,
177            experimental_manifest_keep_removed_file_ttl: mito_config
178                .experimental_manifest_keep_removed_file_ttl,
179        }
180    }
181}
182
183impl LocalGcWorker {
184    /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC.
185    /// The regions are specified by their `RegionId` and should all belong to the same table.
186    ///
187    #[allow(clippy::too_many_arguments)]
188    pub async fn try_new(
189        access_layer: AccessLayerRef,
190        cache_manager: Option<CacheManagerRef>,
191        regions_to_gc: BTreeMap<RegionId, MitoRegionRef>,
192        opt: GcConfig,
193        file_ref_manifest: FileRefsManifest,
194        limiter: &GcLimiterRef,
195        full_file_listing: bool,
196    ) -> Result<Self> {
197        let permit = limiter.permit()?;
198
199        Ok(Self {
200            access_layer,
201            cache_manager,
202            regions: regions_to_gc,
203            opt,
204            file_ref_manifest,
205            _permit: permit,
206            full_file_listing,
207        })
208    }
209
210    /// Get tmp ref files for all current regions
211    pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileId>>> {
212        let mut tmp_ref_files = HashMap::new();
213        for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
214            tmp_ref_files
215                .entry(*region_id)
216                .or_insert_with(HashSet::new)
217                .extend(file_refs.clone());
218        }
219
220        Ok(tmp_ref_files)
221    }
222
223    /// Run the GC worker in serial mode,
224    /// considering list files could be slow and run multiple regions in parallel
225    /// may cause too many concurrent listing operations.
226    ///
227    /// TODO(discord9): consider instead running in parallel mode
228    pub async fn run(self) -> Result<GcReport> {
229        info!("LocalGcWorker started");
230        let now = std::time::Instant::now();
231
232        let mut deleted_files = HashMap::new();
233        let tmp_ref_files = self.read_tmp_ref_files().await?;
234        for (region_id, region) in &self.regions {
235            let per_region_time = std::time::Instant::now();
236            if region.manifest_ctx.current_state() == RegionRoleState::Follower {
237                return UnexpectedSnafu {
238                    reason: format!(
239                        "Region {} is in Follower state, should not run GC on follower regions",
240                        region_id
241                    ),
242                }
243                .fail();
244            }
245            let tmp_ref_files = tmp_ref_files
246                .get(region_id)
247                .cloned()
248                .unwrap_or_else(HashSet::new);
249            let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?;
250            deleted_files.insert(*region_id, files);
251            debug!(
252                "GC for region {} took {} secs.",
253                region_id,
254                per_region_time.elapsed().as_secs_f32()
255            );
256        }
257        info!(
258            "LocalGcWorker finished after {} secs.",
259            now.elapsed().as_secs_f32()
260        );
261        let report = GcReport {
262            deleted_files,
263            need_retry_regions: HashSet::new(),
264        };
265        Ok(report)
266    }
267}
268
269impl LocalGcWorker {
270    /// concurrency of listing files per region.
271    /// This is used to limit the number of concurrent listing operations and speed up listing
272    pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
273
274    /// Perform GC for the region.
275    /// 1. Get all the removed files in delta manifest files and their expel times
276    /// 2. List all files in the region dir concurrently
277    /// 3. Filter out the files that are still in use or may still be kept for a while
278    /// 4. Delete the unused files
279    ///
280    /// Note that the files that are still in use or may still be kept for a while are not deleted
281    /// to avoid deleting files that are still needed.
282    pub async fn do_region_gc(
283        &self,
284        region: MitoRegionRef,
285        tmp_ref_files: &HashSet<FileId>,
286    ) -> Result<Vec<FileId>> {
287        let region_id = region.region_id();
288
289        debug!("Doing gc for region {}", region_id);
290        // do the time consuming listing only when full_file_listing is true
291        // and do it first to make sure we have the latest manifest etc.
292        let all_entries = if self.full_file_listing {
293            self.list_from_object_store(&region).await?
294        } else {
295            vec![]
296        };
297
298        let manifest = region.manifest_ctx.manifest().await;
299        let region_id = manifest.metadata.region_id;
300        let current_files = &manifest.files;
301
302        let recently_removed_files = self.get_removed_files_expel_times(&manifest).await?;
303
304        if recently_removed_files.is_empty() {
305            // no files to remove, skip
306            debug!("No recently removed files to gc for region {}", region_id);
307        }
308
309        let removed_file_cnt = recently_removed_files
310            .values()
311            .map(|s| s.len())
312            .sum::<usize>();
313
314        let in_used: HashSet<FileId> = current_files
315            .keys()
316            .cloned()
317            .chain(tmp_ref_files.clone().into_iter())
318            .collect();
319
320        let unused_files = self
321            .list_to_be_deleted_files(region_id, &in_used, recently_removed_files, all_entries)
322            .await?;
323
324        let unused_file_cnt = unused_files.len();
325
326        debug!(
327            "gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, In-used files: {}, recently removed files: {}, Unused files to delete: {} ",
328            current_files.len(),
329            tmp_ref_files.len(),
330            in_used.len(),
331            removed_file_cnt,
332            unused_files.len()
333        );
334
335        // TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once
336        // index file design is stable
337        let file_pairs: Vec<(FileId, u64)> =
338            unused_files.iter().map(|file_id| (*file_id, 0)).collect();
339        // TODO(discord9): gc worker need another major refactor to support versioned index files
340
341        debug!(
342            "Found {} unused index files to delete for region {}",
343            file_pairs.len(),
344            region_id
345        );
346
347        self.delete_files(region_id, &file_pairs).await?;
348
349        debug!(
350            "Successfully deleted {} unused files for region {}",
351            unused_file_cnt, region_id
352        );
353        // TODO(discord9): update region manifest about deleted files
354        self.update_manifest_removed_files(&region, unused_files.clone())
355            .await?;
356
357        Ok(unused_files)
358    }
359
360    async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, u64)]) -> Result<()> {
361        delete_files(
362            region_id,
363            file_ids,
364            true,
365            &self.access_layer,
366            &self.cache_manager,
367        )
368        .await?;
369
370        // FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now
371        GC_DELETE_FILE_CNT.add(file_ids.len() as i64);
372
373        Ok(())
374    }
375
376    /// Update region manifest for clear the actually deleted files
377    async fn update_manifest_removed_files(
378        &self,
379        region: &MitoRegionRef,
380        deleted_files: Vec<FileId>,
381    ) -> Result<()> {
382        let deleted_file_cnt = deleted_files.len();
383        debug!(
384            "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
385            region.region_id()
386        );
387
388        let mut manager = region.manifest_ctx.manifest_manager.write().await;
389        let cnt = deleted_files.len();
390        manager.clear_deleted_files(deleted_files);
391        debug!(
392            "Updated region_id={} region manifest to clear {cnt} deleted files",
393            region.region_id(),
394        );
395
396        Ok(())
397    }
398
399    /// Get all the removed files in delta manifest files and their expel times.
400    /// The expel time is the time when the file is considered as removed.
401    /// Which is the last modified time of delta manifest which contains the remove action.
402    ///
403    pub async fn get_removed_files_expel_times(
404        &self,
405        region_manifest: &Arc<RegionManifest>,
406    ) -> Result<BTreeMap<Timestamp, HashSet<FileId>>> {
407        let mut ret = BTreeMap::new();
408        for files in &region_manifest.removed_files.removed_files {
409            let expel_time = Timestamp::new_millisecond(files.removed_at);
410            let set = ret.entry(expel_time).or_insert_with(HashSet::new);
411            set.extend(files.file_ids.iter().cloned());
412        }
413
414        Ok(ret)
415    }
416
417    /// Create partitioned listers for concurrent file listing based on concurrency level.
418    /// Returns a vector of (lister, end_boundary) pairs for parallel processing.
419    async fn partition_region_files(
420        &self,
421        region_id: RegionId,
422        concurrency: usize,
423    ) -> Result<Vec<(Lister, Option<String>)>> {
424        let region_dir = self.access_layer.build_region_dir(region_id);
425
426        let partitions = gen_partition_from_concurrency(concurrency);
427        let bounds = vec![None]
428            .into_iter()
429            .chain(partitions.iter().map(|p| Some(p.clone())))
430            .chain(vec![None])
431            .collect::<Vec<_>>();
432
433        let mut listers = vec![];
434        for part in bounds.windows(2) {
435            let start = part[0].clone();
436            let end = part[1].clone();
437            let mut lister = self.access_layer.object_store().lister_with(&region_dir);
438            if let Some(s) = start {
439                lister = lister.start_after(&s);
440            }
441
442            let lister = lister.await.context(OpenDalSnafu)?;
443            listers.push((lister, end));
444        }
445
446        Ok(listers)
447    }
448
449    /// List all files in the region directory.
450    /// Returns a vector of all file entries found.
451    /// This might take a long time if there are many files in the region directory.
452    async fn list_from_object_store(&self, region: &MitoRegionRef) -> Result<Vec<Entry>> {
453        let start = tokio::time::Instant::now();
454        let region_id = region.region_id();
455        let manifest = region.manifest_ctx.manifest().await;
456        let current_files = &manifest.files;
457        let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
458            .max(1)
459            .min(self.opt.max_concurrent_lister_per_gc_job);
460
461        let listers = self.partition_region_files(region_id, concurrency).await?;
462        let lister_cnt = listers.len();
463
464        // Step 2: Concurrently list all files in the region directory
465        let all_entries = self.list_region_files_concurrent(listers).await?;
466        let cnt = all_entries.len();
467        info!(
468            "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
469            start.elapsed().as_secs_f64(),
470            region_id
471        );
472        Ok(all_entries)
473    }
474
475    /// Concurrently list all files in the region directory using the provided listers.
476    /// Returns a vector of all file entries found across all partitions.
477    async fn list_region_files_concurrent(
478        &self,
479        listers: Vec<(Lister, Option<String>)>,
480    ) -> Result<Vec<Entry>> {
481        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
482        let mut handles = vec![];
483
484        for (lister, end) in listers {
485            let tx = tx.clone();
486            let handle = tokio::spawn(async move {
487                let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
488                    Ok(e) => {
489                        if let Some(end) = &end {
490                            // reach end, stop listing
491                            e.name() < end.as_str()
492                        } else {
493                            // no end, take all entries
494                            true
495                        }
496                    }
497                    // entry went wrong, log and skip it
498                    Err(err) => {
499                        warn!("Failed to list entry: {}", err);
500                        true
501                    }
502                });
503                let stream = stream
504                    .filter(|e| {
505                        if let Ok(e) = &e {
506                            // notice that we only care about files, skip dirs
507                            e.metadata().is_file()
508                        } else {
509                            // error entry, take for further logging
510                            true
511                        }
512                    })
513                    .collect::<Vec<_>>()
514                    .await;
515                // ordering of files doesn't matter here, so we can send them directly
516                tx.send(stream).await.expect("Failed to send entries");
517            });
518
519            handles.push(handle);
520        }
521
522        // Wait for all listers to finish
523        for handle in handles {
524            handle.await.context(JoinSnafu)?;
525        }
526
527        drop(tx); // Close the channel to stop receiving
528
529        // Collect all entries from the channel
530        let mut all_entries = vec![];
531        while let Some(stream) = rx.recv().await {
532            all_entries.extend(stream.into_iter().filter_map(Result::ok));
533        }
534
535        Ok(all_entries)
536    }
537
538    /// Filter files to determine which ones can be deleted based on usage status and lingering time.
539    /// Returns a vector of file IDs that are safe to delete.
540    fn filter_deletable_files(
541        &self,
542        entries: Vec<Entry>,
543        in_use_filenames: &HashSet<&FileId>,
544        may_linger_filenames: &HashSet<&FileId>,
545        eligible_for_removal: &HashSet<&FileId>,
546        unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
547    ) -> (Vec<FileId>, HashSet<FileId>) {
548        let mut all_unused_files_ready_for_delete = vec![];
549        let mut all_in_exist_linger_files = HashSet::new();
550
551        for entry in entries {
552            let file_id = match location::parse_file_id_from_path(entry.name()) {
553                Ok(file_id) => file_id,
554                Err(err) => {
555                    error!(err; "Failed to parse file id from path: {}", entry.name());
556                    // if we can't parse the file id, it means it's not a sst or index file
557                    // shouldn't delete it because we don't know what it is
558                    continue;
559                }
560            };
561
562            if may_linger_filenames.contains(&file_id) {
563                all_in_exist_linger_files.insert(file_id);
564            }
565
566            let should_delete = !in_use_filenames.contains(&file_id)
567                && !may_linger_filenames.contains(&file_id)
568                && {
569                    if !eligible_for_removal.contains(&file_id) {
570                        // if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while
571                        // using it's last modified time
572                        // notice unknown files use a different lingering time
573                        entry
574                            .metadata()
575                            .last_modified()
576                            .map(|t| t < unknown_file_may_linger_until)
577                            .unwrap_or(false)
578                    } else {
579                        // if the file did appear in manifest delta(and passes previous predicate), we can delete it immediately
580                        true
581                    }
582                };
583
584            if should_delete {
585                all_unused_files_ready_for_delete.push(file_id);
586            }
587        }
588
589        (all_unused_files_ready_for_delete, all_in_exist_linger_files)
590    }
591
592    /// Concurrently list unused files in the region dir
593    /// because there may be a lot of files in the region dir
594    /// and listing them may take a long time.
595    ///
596    /// When `full_file_listing` is false, this method will only delete files tracked in
597    /// `recently_removed_files` without performing expensive list operations, which significantly
598    /// improves performance. When `full_file_listing` is true, it performs a full listing to
599    /// find and delete orphan files.
600    pub async fn list_to_be_deleted_files(
601        &self,
602        region_id: RegionId,
603        in_used: &HashSet<FileId>,
604        recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
605        all_entries: Vec<Entry>,
606    ) -> Result<Vec<FileId>> {
607        let now = chrono::Utc::now();
608        let may_linger_until = self
609            .opt
610            .lingering_time
611            .map(|lingering_time| {
612                chrono::Duration::from_std(lingering_time)
613                    .with_context(|_| DurationOutOfRangeSnafu {
614                        input: lingering_time,
615                    })
616                    .map(|t| now - t)
617            })
618            .transpose()?;
619
620        let unknown_file_may_linger_until = now
621            - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
622                |_| DurationOutOfRangeSnafu {
623                    input: self.opt.unknown_file_lingering_time,
624                },
625            )?;
626
627        // files that may linger, which means they are not in use but may still be kept for a while
628        let threshold =
629            may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
630        let mut recently_removed_files = recently_removed_files;
631        let may_linger_files = match threshold {
632            Some(threshold) => recently_removed_files.split_off(&threshold),
633            None => BTreeMap::new(),
634        };
635        debug!("may_linger_files: {:?}", may_linger_files);
636
637        let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
638
639        let eligible_for_removal = recently_removed_files
640            .values()
641            .flatten()
642            .collect::<HashSet<_>>();
643
644        // in use filenames, include sst and index files
645        let in_use_filenames = in_used.iter().collect::<HashSet<_>>();
646
647        // When full_file_listing is false, skip expensive list operations and only delete
648        // files that are tracked in recently_removed_files
649        if !self.full_file_listing {
650            // Only delete files that:
651            // 1. Are in recently_removed_files (tracked in manifest)
652            // 2. Are not in use
653            // 3. Have passed the lingering time
654            let files_to_delete: Vec<FileId> = eligible_for_removal
655                .iter()
656                .filter(|file_id| !in_use_filenames.contains(*file_id))
657                .map(|&f| *f)
658                .collect();
659
660            info!(
661                "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
662                region_id,
663                files_to_delete.len()
664            );
665
666            return Ok(files_to_delete);
667        }
668
669        // Full file listing mode: get the full list of files from object store
670
671        // Step 3: Filter files to determine which ones can be deleted
672        let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
673            .filter_deletable_files(
674                all_entries,
675                &in_use_filenames,
676                &may_linger_filenames,
677                &eligible_for_removal,
678                unknown_file_may_linger_until,
679            );
680
681        debug!("All in exist linger files: {:?}", all_in_exist_linger_files);
682
683        Ok(all_unused_files_ready_for_delete)
684    }
685}
686
687/// Generate partition prefixes based on concurrency and
688/// assume file names are evenly-distributed uuid string,
689/// to evenly distribute files across partitions.
690/// For example, if concurrency is 2, partition prefixes will be:
691/// ["8"] so it divide uuids into two partitions based on the first character.
692/// If concurrency is 32, partition prefixes will be:
693/// ["08", "10", "18", "20", "28", "30", "38" ..., "f0", "f8"]
694/// if concurrency is 1, it returns an empty vector.
695///
696fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
697    let n = concurrency.next_power_of_two();
698    if n <= 1 {
699        return vec![];
700    }
701
702    // `d` is the number of hex characters required to build the partition key.
703    // `p` is the total number of possible values for a key of length `d`.
704    // We need to find the smallest `d` such that 16^d >= n.
705    let mut d = 0;
706    let mut p: u128 = 1;
707    while p < n as u128 {
708        p *= 16;
709        d += 1;
710    }
711
712    let total_space = p;
713    let step = total_space / n as u128;
714
715    (1..n)
716        .map(|i| {
717            let boundary = i as u128 * step;
718            format!("{:0width$x}", boundary, width = d)
719        })
720        .collect()
721}
722
723#[cfg(test)]
724mod tests {
725    use super::*;
726
727    #[test]
728    fn test_gen_partition_from_concurrency() {
729        let partitions = gen_partition_from_concurrency(1);
730        assert!(partitions.is_empty());
731
732        let partitions = gen_partition_from_concurrency(2);
733        assert_eq!(partitions, vec!["8"]);
734
735        let partitions = gen_partition_from_concurrency(3);
736        assert_eq!(partitions, vec!["4", "8", "c"]);
737
738        let partitions = gen_partition_from_concurrency(4);
739        assert_eq!(partitions, vec!["4", "8", "c"]);
740
741        let partitions = gen_partition_from_concurrency(8);
742        assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
743
744        let partitions = gen_partition_from_concurrency(16);
745        assert_eq!(
746            partitions,
747            vec![
748                "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
749            ]
750        );
751
752        let partitions = gen_partition_from_concurrency(32);
753        assert_eq!(
754            partitions,
755            [
756                "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
757                "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
758                "e8", "f0", "f8",
759            ]
760        );
761    }
762}