mito2/
gc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! GC worker which periodically checks and removes unused/obsolete  SST files.
16//!
17//! `expel time`: the time when the file is considered as removed, as in removed from the manifest.
18//! `lingering time`: the time duration before deleting files after they are removed from manifest.
19//! `delta manifest`: the manifest files after the last checkpoint that contains the changes to the manifest.
20//! `delete time`: the time when the file is actually deleted from the object store.
21//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint.
22//!
23
24use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::{debug, error, info, warn};
30use common_time::Timestamp;
31use object_store::{Entry, Lister};
32use serde::{Deserialize, Serialize};
33use snafu::{OptionExt, ResultExt as _, ensure};
34use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
35use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
36use tokio_stream::StreamExt;
37
38use crate::access_layer::AccessLayerRef;
39use crate::cache::CacheManagerRef;
40use crate::config::MitoConfig;
41use crate::error::{
42    DurationOutOfRangeSnafu, EmptyRegionDirSnafu, JoinSnafu, OpenDalSnafu, RegionNotFoundSnafu,
43    Result, TooManyGcJobsSnafu, UnexpectedSnafu,
44};
45use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
46use crate::manifest::storage::manifest_compress_type;
47use crate::metrics::GC_DEL_FILE_CNT;
48use crate::region::opener::new_manifest_dir;
49use crate::sst::file::delete_files;
50use crate::sst::location::{self, region_dir_from_table_dir};
51
52/// Limit the amount of concurrent GC jobs on the datanode
53pub struct GcLimiter {
54    pub gc_job_limit: Arc<tokio::sync::Semaphore>,
55    gc_concurrency: usize,
56}
57
58pub type GcLimiterRef = Arc<GcLimiter>;
59
60impl GcLimiter {
61    pub fn new(gc_concurrency: usize) -> Self {
62        Self {
63            gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
64            gc_concurrency,
65        }
66    }
67
68    pub fn running_gc_tasks(&self) -> u32 {
69        (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
70    }
71
72    pub fn gc_concurrency(&self) -> u32 {
73        self.gc_concurrency as u32
74    }
75
76    pub fn gc_stat(&self) -> GcStat {
77        GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
78    }
79
80    /// Try to acquire a permit for a GC job.
81    ///
82    /// If no permit is available, returns an `TooManyGcJobs` error.
83    pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
84        self.gc_job_limit
85            .clone()
86            .try_acquire_owned()
87            .map_err(|e| match e {
88                TryAcquireError::Closed => UnexpectedSnafu {
89                    reason: format!("Failed to acquire gc permit: {e}"),
90                }
91                .build(),
92                TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
93            })
94    }
95}
96
97#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
98pub struct GcConfig {
99    /// Whether GC is enabled.
100    pub enable: bool,
101    /// Lingering time before deleting files.
102    /// Should be long enough to allow long running queries to finish.
103    ///
104    /// TODO(discord9): long running queries should actively write tmp manifest files
105    /// to prevent deletion of files they are using.
106    #[serde(with = "humantime_serde")]
107    pub lingering_time: Duration,
108    /// Lingering time before deleting unknown files(files with undetermine expel time).
109    /// expel time is the time when the file is considered as removed, as in removed from the manifest.
110    /// This should only occur rarely, as manifest keep tracks in `removed_files` field
111    /// unless something goes wrong.
112    #[serde(with = "humantime_serde")]
113    pub unknown_file_lingering_time: Duration,
114    /// Maximum concurrent list operations per GC job.
115    /// This is used to limit the number of concurrent listing operations and speed up listing.
116    pub max_concurrent_lister_per_gc_job: usize,
117    /// Maximum concurrent GC jobs.
118    /// This is used to limit the number of concurrent GC jobs running on the datanode
119    /// to prevent too many concurrent GC jobs from overwhelming the datanode.
120    pub max_concurrent_gc_job: usize,
121}
122
123impl Default for GcConfig {
124    fn default() -> Self {
125        Self {
126            enable: false,
127            // expect long running queries to be finished within a reasonable time
128            lingering_time: Duration::from_secs(60 * 5),
129            // 6 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
130            unknown_file_lingering_time: Duration::from_secs(60 * 60 * 6),
131            max_concurrent_lister_per_gc_job: 32,
132            max_concurrent_gc_job: 4,
133        }
134    }
135}
136
137pub struct LocalGcWorker {
138    pub(crate) access_layer: AccessLayerRef,
139    pub(crate) cache_manager: Option<CacheManagerRef>,
140    pub(crate) manifest_mgrs: HashMap<RegionId, RegionManifestManager>,
141    /// Lingering time before deleting files.
142    pub(crate) opt: GcConfig,
143    pub(crate) manifest_open_config: ManifestOpenConfig,
144    /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
145    ///
146    /// Also contains manifest versions of regions when the tmp ref files are generated.
147    /// Used to determine whether the tmp ref files are outdated.
148    pub(crate) file_ref_manifest: FileRefsManifest,
149    _permit: OwnedSemaphorePermit,
150    /// Whether to perform full file listing during GC.
151    /// When set to false, GC will only delete files that are tracked in the manifest's removed_files,
152    /// which can significantly improve performance by avoiding expensive list operations.
153    /// When set to true, GC will perform a full listing to find and delete orphan files
154    /// (files not tracked in the manifest).
155    ///
156    /// Set to false for regular GC operations to optimize performance.
157    /// Set to true periodically or when you need to clean up orphan files.
158    pub full_file_listing: bool,
159}
160
161pub struct ManifestOpenConfig {
162    pub compress_manifest: bool,
163    pub manifest_checkpoint_distance: u64,
164    pub experimental_manifest_keep_removed_file_count: usize,
165    pub experimental_manifest_keep_removed_file_ttl: Duration,
166}
167
168impl From<MitoConfig> for ManifestOpenConfig {
169    fn from(mito_config: MitoConfig) -> Self {
170        Self {
171            compress_manifest: mito_config.compress_manifest,
172            manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
173            experimental_manifest_keep_removed_file_count: mito_config
174                .experimental_manifest_keep_removed_file_count,
175            experimental_manifest_keep_removed_file_ttl: mito_config
176                .experimental_manifest_keep_removed_file_ttl,
177        }
178    }
179}
180
181impl LocalGcWorker {
182    /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC.
183    /// The regions are specified by their `RegionId` and should all belong to the same table.
184    ///
185    #[allow(clippy::too_many_arguments)]
186    pub async fn try_new(
187        access_layer: AccessLayerRef,
188        cache_manager: Option<CacheManagerRef>,
189        regions_to_gc: BTreeSet<RegionId>,
190        opt: GcConfig,
191        manifest_open_config: ManifestOpenConfig,
192        file_ref_manifest: FileRefsManifest,
193        limiter: &GcLimiterRef,
194        full_file_listing: bool,
195    ) -> Result<Self> {
196        let table_id = regions_to_gc
197            .first()
198            .context(UnexpectedSnafu {
199                reason: "Expect at least one region, found none",
200            })?
201            .table_id();
202        let permit = limiter.permit()?;
203        let mut zelf = Self {
204            access_layer,
205            cache_manager,
206            manifest_mgrs: HashMap::new(),
207            opt,
208            manifest_open_config,
209            file_ref_manifest,
210            _permit: permit,
211            full_file_listing,
212        };
213
214        // dedup just in case
215        for region_id in regions_to_gc {
216            ensure!(
217                region_id.table_id() == table_id,
218                UnexpectedSnafu {
219                    reason: format!(
220                        "All regions should belong to the same table, found region {} and table {}",
221                        region_id, table_id
222                    ),
223                }
224            );
225            let mgr = zelf.open_mgr_for(region_id).await?;
226            zelf.manifest_mgrs.insert(region_id, mgr);
227        }
228
229        Ok(zelf)
230    }
231
232    /// Get tmp ref files for all current regions
233    ///
234    /// Outdated regions are added to `outdated_regions` set
235    pub async fn read_tmp_ref_files(
236        &self,
237        outdated_regions: &mut HashSet<RegionId>,
238    ) -> Result<HashMap<RegionId, HashSet<FileId>>> {
239        for (region_id, region_mgr) in &self.manifest_mgrs {
240            let current_version = region_mgr.manifest().manifest_version;
241            if &current_version
242                > self
243                    .file_ref_manifest
244                    .manifest_version
245                    .get(region_id)
246                    .with_context(|| UnexpectedSnafu {
247                        reason: format!(
248                            "Region {} not found in tmp ref manifest version map",
249                            region_id
250                        ),
251                    })?
252            {
253                outdated_regions.insert(*region_id);
254            }
255        }
256        // TODO(discord9): verify manifest version before reading tmp ref files
257
258        let mut tmp_ref_files = HashMap::new();
259        for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
260            if outdated_regions.contains(region_id) {
261                // skip outdated regions
262                continue;
263            }
264            tmp_ref_files
265                .entry(*region_id)
266                .or_insert_with(HashSet::new)
267                .extend(file_refs.clone());
268        }
269
270        Ok(tmp_ref_files)
271    }
272
273    /// Run the GC worker in serial mode,
274    /// considering list files could be slow and run multiple regions in parallel
275    /// may cause too many concurrent listing operations.
276    ///
277    /// TODO(discord9): consider instead running in parallel mode
278    pub async fn run(self) -> Result<GcReport> {
279        info!("LocalGcWorker started");
280        let now = std::time::Instant::now();
281
282        let mut outdated_regions = HashSet::new();
283        let mut deleted_files = HashMap::new();
284        let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?;
285        for region_id in self.manifest_mgrs.keys() {
286            debug!("Doing gc for region {}", region_id);
287            let tmp_ref_files = tmp_ref_files
288                .get(region_id)
289                .cloned()
290                .unwrap_or_else(HashSet::new);
291            let files = self.do_region_gc(*region_id, &tmp_ref_files).await?;
292            deleted_files.insert(*region_id, files);
293            debug!("Gc for region {} finished", region_id);
294        }
295        info!(
296            "LocalGcWorker finished after {} secs.",
297            now.elapsed().as_secs()
298        );
299        let report = GcReport {
300            deleted_files,
301            need_retry_regions: outdated_regions.into_iter().collect(),
302        };
303        Ok(report)
304    }
305}
306
307impl LocalGcWorker {
308    /// concurrency of listing files per region.
309    /// This is used to limit the number of concurrent listing operations and speed up listing
310    pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
311
312    /// Perform GC for the region.
313    /// 1. Get all the removed files in delta manifest files and their expel times
314    /// 2. List all files in the region dir concurrently
315    /// 3. Filter out the files that are still in use or may still be kept for a while
316    /// 4. Delete the unused files
317    ///
318    /// Note that the files that are still in use or may still be kept for a while are not deleted
319    /// to avoid deleting files that are still needed.
320    pub async fn do_region_gc(
321        &self,
322        region_id: RegionId,
323        tmp_ref_files: &HashSet<FileId>,
324    ) -> Result<Vec<FileId>> {
325        debug!("Doing gc for region {}", region_id);
326        let manifest = self
327            .manifest_mgrs
328            .get(&region_id)
329            .context(RegionNotFoundSnafu { region_id })?
330            .manifest();
331        let region_id = manifest.metadata.region_id;
332        let current_files = &manifest.files;
333
334        let recently_removed_files = self.get_removed_files_expel_times(region_id).await?;
335
336        if recently_removed_files.is_empty() {
337            // no files to remove, skip
338            debug!("No recently removed files to gc for region {}", region_id);
339        }
340
341        debug!(
342            "Found {} recently removed files sets for region {}",
343            recently_removed_files.len(),
344            region_id
345        );
346
347        let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
348            .max(1)
349            .min(self.opt.max_concurrent_lister_per_gc_job);
350
351        let in_used = current_files
352            .keys()
353            .cloned()
354            .chain(tmp_ref_files.clone().into_iter())
355            .collect();
356
357        let unused_files = self
358            .list_to_be_deleted_files(region_id, in_used, recently_removed_files, concurrency)
359            .await?;
360
361        let unused_len = unused_files.len();
362
363        debug!(
364            "Found {} unused files to delete for region {}",
365            unused_len, region_id
366        );
367
368        let file_pairs: Vec<(FileId, FileId)> = unused_files
369            .iter()
370            .filter_map(|file_id| {
371                current_files
372                    .get(file_id)
373                    .map(|meta| (meta.file_id().file_id(), meta.index_file_id().file_id()))
374            })
375            .collect();
376
377        info!(
378            "Found {} unused index files to delete for region {}",
379            file_pairs.len(),
380            region_id
381        );
382
383        self.delete_files(region_id, &file_pairs).await?;
384
385        debug!(
386            "Successfully deleted {} unused files for region {}",
387            unused_len, region_id
388        );
389
390        Ok(unused_files)
391    }
392
393    async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> {
394        delete_files(
395            region_id,
396            file_ids,
397            true,
398            &self.access_layer,
399            &self.cache_manager,
400        )
401        .await?;
402
403        // FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now
404        GC_DEL_FILE_CNT.add(file_ids.len() as i64);
405
406        Ok(())
407    }
408
409    /// Get the manifest manager for the region.
410    async fn open_mgr_for(&self, region_id: RegionId) -> Result<RegionManifestManager> {
411        let table_dir = self.access_layer.table_dir();
412        let path_type = self.access_layer.path_type();
413        let mito_config = &self.manifest_open_config;
414
415        let region_manifest_options = RegionManifestOptions {
416            manifest_dir: new_manifest_dir(&region_dir_from_table_dir(
417                table_dir, region_id, path_type,
418            )),
419            object_store: self.access_layer.object_store().clone(),
420            compress_type: manifest_compress_type(mito_config.compress_manifest),
421            checkpoint_distance: mito_config.manifest_checkpoint_distance,
422            remove_file_options: RemoveFileOptions {
423                keep_count: mito_config.experimental_manifest_keep_removed_file_count,
424                keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
425            },
426        };
427
428        RegionManifestManager::open(
429            region_manifest_options,
430            Default::default(),
431            Default::default(),
432        )
433        .await?
434        .context(EmptyRegionDirSnafu {
435            region_id,
436            region_dir: &region_dir_from_table_dir(table_dir, region_id, path_type),
437        })
438    }
439
440    /// Get all the removed files in delta manifest files and their expel times.
441    /// The expel time is the time when the file is considered as removed.
442    /// Which is the last modified time of delta manifest which contains the remove action.
443    ///
444    pub async fn get_removed_files_expel_times(
445        &self,
446        region_id: RegionId,
447    ) -> Result<BTreeMap<Timestamp, HashSet<FileId>>> {
448        let region_manifest = self
449            .manifest_mgrs
450            .get(&region_id)
451            .context(RegionNotFoundSnafu { region_id })?
452            .manifest();
453
454        let mut ret = BTreeMap::new();
455        for files in &region_manifest.removed_files.removed_files {
456            let expel_time = Timestamp::new_millisecond(files.removed_at);
457            let set = ret.entry(expel_time).or_insert_with(HashSet::new);
458            set.extend(files.file_ids.iter().cloned());
459        }
460
461        Ok(ret)
462    }
463
464    /// Create partitioned listers for concurrent file listing based on concurrency level.
465    /// Returns a vector of (lister, end_boundary) pairs for parallel processing.
466    async fn partition_region_files(
467        &self,
468        region_id: RegionId,
469        concurrency: usize,
470    ) -> Result<Vec<(Lister, Option<String>)>> {
471        let region_dir = self.access_layer.build_region_dir(region_id);
472
473        let partitions = gen_partition_from_concurrency(concurrency);
474        let bounds = vec![None]
475            .into_iter()
476            .chain(partitions.iter().map(|p| Some(p.clone())))
477            .chain(vec![None])
478            .collect::<Vec<_>>();
479
480        let mut listers = vec![];
481        for part in bounds.windows(2) {
482            let start = part[0].clone();
483            let end = part[1].clone();
484            let mut lister = self.access_layer.object_store().lister_with(&region_dir);
485            if let Some(s) = start {
486                lister = lister.start_after(&s);
487            }
488
489            let lister = lister.await.context(OpenDalSnafu)?;
490            listers.push((lister, end));
491        }
492
493        Ok(listers)
494    }
495
496    /// Concurrently list all files in the region directory using the provided listers.
497    /// Returns a vector of all file entries found across all partitions.
498    async fn list_region_files_concurrent(
499        &self,
500        listers: Vec<(Lister, Option<String>)>,
501    ) -> Result<Vec<Entry>> {
502        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
503        let mut handles = vec![];
504
505        for (lister, end) in listers {
506            let tx = tx.clone();
507            let handle = tokio::spawn(async move {
508                let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
509                    Ok(e) => {
510                        if let Some(end) = &end {
511                            // reach end, stop listing
512                            e.name() < end.as_str()
513                        } else {
514                            // no end, take all entries
515                            true
516                        }
517                    }
518                    // entry went wrong, log and skip it
519                    Err(err) => {
520                        warn!("Failed to list entry: {}", err);
521                        true
522                    }
523                });
524                let stream = stream
525                    .filter(|e| {
526                        if let Ok(e) = &e {
527                            // notice that we only care about files, skip dirs
528                            e.metadata().is_file()
529                        } else {
530                            // error entry, take for further logging
531                            true
532                        }
533                    })
534                    .collect::<Vec<_>>()
535                    .await;
536                // ordering of files doesn't matter here, so we can send them directly
537                tx.send(stream).await.expect("Failed to send entries");
538            });
539
540            handles.push(handle);
541        }
542
543        // Wait for all listers to finish
544        for handle in handles {
545            handle.await.context(JoinSnafu)?;
546        }
547
548        drop(tx); // Close the channel to stop receiving
549
550        // Collect all entries from the channel
551        let mut all_entries = vec![];
552        while let Some(stream) = rx.recv().await {
553            all_entries.extend(stream.into_iter().filter_map(Result::ok));
554        }
555
556        Ok(all_entries)
557    }
558
559    /// Filter files to determine which ones can be deleted based on usage status and lingering time.
560    /// Returns a vector of file IDs that are safe to delete.
561    fn filter_deletable_files(
562        &self,
563        entries: Vec<Entry>,
564        in_use_filenames: &HashSet<&FileId>,
565        may_linger_filenames: &HashSet<&FileId>,
566        eligible_for_removal: &HashSet<&FileId>,
567        unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
568    ) -> (Vec<FileId>, HashSet<FileId>) {
569        let mut all_unused_files_ready_for_delete = vec![];
570        let mut all_in_exist_linger_files = HashSet::new();
571
572        for entry in entries {
573            let file_id = match location::parse_file_id_from_path(entry.name()) {
574                Ok(file_id) => file_id,
575                Err(err) => {
576                    error!(err; "Failed to parse file id from path: {}", entry.name());
577                    // if we can't parse the file id, it means it's not a sst or index file
578                    // shouldn't delete it because we don't know what it is
579                    continue;
580                }
581            };
582
583            if may_linger_filenames.contains(&file_id) {
584                all_in_exist_linger_files.insert(file_id);
585            }
586
587            let should_delete = !in_use_filenames.contains(&file_id)
588                && !may_linger_filenames.contains(&file_id)
589                && {
590                    if !eligible_for_removal.contains(&file_id) {
591                        // if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while
592                        // using it's last modified time
593                        // notice unknown files use a different lingering time
594                        entry
595                            .metadata()
596                            .last_modified()
597                            .map(|t| t < unknown_file_may_linger_until)
598                            .unwrap_or(false)
599                    } else {
600                        // if the file did appear in manifest delta(and passes previous predicate), we can delete it immediately
601                        true
602                    }
603                };
604
605            if should_delete {
606                all_unused_files_ready_for_delete.push(file_id);
607            }
608        }
609
610        (all_unused_files_ready_for_delete, all_in_exist_linger_files)
611    }
612
613    /// Concurrently list unused files in the region dir
614    /// because there may be a lot of files in the region dir
615    /// and listing them may take a long time.
616    ///
617    /// When `full_file_listing` is false, this method will only delete files tracked in
618    /// `recently_removed_files` without performing expensive list operations, which significantly
619    /// improves performance. When `full_file_listing` is true, it performs a full listing to
620    /// find and delete orphan files.
621    pub async fn list_to_be_deleted_files(
622        &self,
623        region_id: RegionId,
624        in_used: HashSet<FileId>,
625        recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
626        concurrency: usize,
627    ) -> Result<Vec<FileId>> {
628        let start = tokio::time::Instant::now();
629        let now = chrono::Utc::now();
630        let may_linger_until = now
631            - chrono::Duration::from_std(self.opt.lingering_time).with_context(|_| {
632                DurationOutOfRangeSnafu {
633                    input: self.opt.lingering_time,
634                }
635            })?;
636
637        let unknown_file_may_linger_until = now
638            - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
639                |_| DurationOutOfRangeSnafu {
640                    input: self.opt.unknown_file_lingering_time,
641                },
642            )?;
643
644        // files that may linger, which means they are not in use but may still be kept for a while
645        let threshold = Timestamp::new_millisecond(may_linger_until.timestamp_millis());
646        let mut recently_removed_files = recently_removed_files;
647        let may_linger_files = recently_removed_files.split_off(&threshold);
648        let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
649
650        let eligible_for_removal = recently_removed_files
651            .values()
652            .flatten()
653            .collect::<HashSet<_>>();
654
655        // in use filenames, include sst and index files
656        let in_use_filenames = in_used.iter().collect::<HashSet<_>>();
657
658        // When full_file_listing is false, skip expensive list operations and only delete
659        // files that are tracked in recently_removed_files
660        if !self.full_file_listing {
661            // Only delete files that:
662            // 1. Are in recently_removed_files (tracked in manifest)
663            // 2. Are not in use
664            // 3. Have passed the lingering time
665            let files_to_delete: Vec<FileId> = eligible_for_removal
666                .iter()
667                .filter(|file_id| !in_use_filenames.contains(*file_id))
668                .map(|&f| *f)
669                .collect();
670
671            info!(
672                "gc: fast mode (no full listing) cost {} secs for region {}, found {} files to delete from manifest",
673                start.elapsed().as_secs_f64(),
674                region_id,
675                files_to_delete.len()
676            );
677
678            return Ok(files_to_delete);
679        }
680
681        // Full file listing mode: perform expensive list operations to find orphan files
682        // Step 1: Create partitioned listers for concurrent processing
683        let listers = self.partition_region_files(region_id, concurrency).await?;
684        let lister_cnt = listers.len();
685
686        // Step 2: Concurrently list all files in the region directory
687        let all_entries = self.list_region_files_concurrent(listers).await?;
688
689        let cnt = all_entries.len();
690
691        // Step 3: Filter files to determine which ones can be deleted
692        let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
693            .filter_deletable_files(
694                all_entries,
695                &in_use_filenames,
696                &may_linger_filenames,
697                &eligible_for_removal,
698                unknown_file_may_linger_until,
699            );
700
701        info!(
702            "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}, found {} unused files to delete",
703            start.elapsed().as_secs_f64(),
704            region_id,
705            all_unused_files_ready_for_delete.len()
706        );
707        debug!("All in exist linger files: {:?}", all_in_exist_linger_files);
708
709        Ok(all_unused_files_ready_for_delete)
710    }
711}
712
713/// Generate partition prefixes based on concurrency and
714/// assume file names are evenly-distributed uuid string,
715/// to evenly distribute files across partitions.
716/// For example, if concurrency is 2, partition prefixes will be:
717/// ["8"] so it divide uuids into two partitions based on the first character.
718/// If concurrency is 32, partition prefixes will be:
719/// ["08", "10", "18", "20", "28", "30", "38" ..., "f0", "f8"]
720/// if concurrency is 1, it returns an empty vector.
721///
722fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
723    let n = concurrency.next_power_of_two();
724    if n <= 1 {
725        return vec![];
726    }
727
728    // `d` is the number of hex characters required to build the partition key.
729    // `p` is the total number of possible values for a key of length `d`.
730    // We need to find the smallest `d` such that 16^d >= n.
731    let mut d = 0;
732    let mut p: u128 = 1;
733    while p < n as u128 {
734        p *= 16;
735        d += 1;
736    }
737
738    let total_space = p;
739    let step = total_space / n as u128;
740
741    (1..n)
742        .map(|i| {
743            let boundary = i as u128 * step;
744            format!("{:0width$x}", boundary, width = d)
745        })
746        .collect()
747}
748
749#[cfg(test)]
750mod tests {
751    use super::*;
752
753    #[test]
754    fn test_gen_partition_from_concurrency() {
755        let partitions = gen_partition_from_concurrency(1);
756        assert!(partitions.is_empty());
757
758        let partitions = gen_partition_from_concurrency(2);
759        assert_eq!(partitions, vec!["8"]);
760
761        let partitions = gen_partition_from_concurrency(3);
762        assert_eq!(partitions, vec!["4", "8", "c"]);
763
764        let partitions = gen_partition_from_concurrency(4);
765        assert_eq!(partitions, vec!["4", "8", "c"]);
766
767        let partitions = gen_partition_from_concurrency(8);
768        assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
769
770        let partitions = gen_partition_from_concurrency(16);
771        assert_eq!(
772            partitions,
773            vec![
774                "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
775            ]
776        );
777
778        let partitions = gen_partition_from_concurrency(32);
779        assert_eq!(
780            partitions,
781            [
782                "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
783                "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
784                "e8", "f0", "f8",
785            ]
786        );
787    }
788}