mito2/
gc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! GC worker which periodically checks and removes unused/obsolete  SST files.
16//!
17//! `expel time`: the time when the file is considered as removed, as in removed from the manifest.
18//! `lingering time`: the time duration before deleting files after they are removed from manifest.
19//! `delta manifest`: the manifest files after the last checkpoint that contains the changes to the manifest.
20//! `delete time`: the time when the file is actually deleted from the object store.
21//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint.
22//!
23
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::{debug, error, info, warn};
30use common_time::Timestamp;
31use itertools::Itertools;
32use object_store::{Entry, Lister};
33use serde::{Deserialize, Serialize};
34use snafu::{ResultExt as _, ensure};
35use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
36use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
37use tokio_stream::StreamExt;
38
39use crate::access_layer::AccessLayerRef;
40use crate::cache::CacheManagerRef;
41use crate::cache::file_cache::FileType;
42use crate::config::MitoConfig;
43use crate::error::{
44    DurationOutOfRangeSnafu, InvalidRequestSnafu, JoinSnafu, OpenDalSnafu, Result,
45    TooManyGcJobsSnafu, UnexpectedSnafu,
46};
47use crate::manifest::action::{RegionManifest, RemovedFile};
48use crate::metrics::{GC_DELETE_FILE_CNT, GC_ORPHANED_INDEX_FILES, GC_SKIPPED_UNPARSABLE_FILES};
49use crate::region::{MitoRegionRef, RegionRoleState};
50use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_index};
51use crate::sst::location::{self};
52
53#[cfg(test)]
54mod worker_test;
55
56/// Helper function to determine if a file should be deleted based on common logic
57/// shared between Parquet and Puffin file types.
58fn should_delete_file(
59    is_in_manifest: bool,
60    is_in_tmp_ref: bool,
61    is_linger: bool,
62    is_eligible_for_delete: bool,
63    is_region_dropped: bool,
64    _entry: &Entry,
65    _unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
66) -> bool {
67    let is_known = is_linger || is_eligible_for_delete;
68
69    !is_in_manifest
70        && !is_in_tmp_ref
71        && if is_known {
72            is_eligible_for_delete
73        } else {
74            !is_in_tmp_ref && is_region_dropped
75        }
76}
77
78/// Limit the amount of concurrent GC jobs on the datanode
79pub struct GcLimiter {
80    pub gc_job_limit: Arc<tokio::sync::Semaphore>,
81    gc_concurrency: usize,
82}
83
84pub type GcLimiterRef = Arc<GcLimiter>;
85
86impl GcLimiter {
87    pub fn new(gc_concurrency: usize) -> Self {
88        Self {
89            gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
90            gc_concurrency,
91        }
92    }
93
94    pub fn running_gc_tasks(&self) -> u32 {
95        (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
96    }
97
98    pub fn gc_concurrency(&self) -> u32 {
99        self.gc_concurrency as u32
100    }
101
102    pub fn gc_stat(&self) -> GcStat {
103        GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
104    }
105
106    /// Try to acquire a permit for a GC job.
107    ///
108    /// If no permit is available, returns an `TooManyGcJobs` error.
109    pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
110        self.gc_job_limit
111            .clone()
112            .try_acquire_owned()
113            .map_err(|e| match e {
114                TryAcquireError::Closed => UnexpectedSnafu {
115                    reason: format!("Failed to acquire gc permit: {e}"),
116                }
117                .build(),
118                TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
119            })
120    }
121}
122
123#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
124#[serde(default)]
125pub struct GcConfig {
126    /// Whether GC is enabled.
127    pub enable: bool,
128    /// Lingering time before deleting files.
129    /// Should be long enough to allow long running queries to finish.
130    /// If set to None, then unused files will be deleted immediately.
131    ///
132    #[serde(with = "humantime_serde")]
133    pub lingering_time: Option<Duration>,
134    /// Lingering time before deleting unknown files(files with undetermine expel time).
135    /// expel time is the time when the file is considered as removed, as in removed from the manifest.
136    /// This should only occur rarely, as manifest keep tracks in `removed_files` field
137    /// unless something goes wrong.
138    #[serde(with = "humantime_serde")]
139    pub unknown_file_lingering_time: Duration,
140    /// Maximum concurrent list operations per GC job.
141    /// This is used to limit the number of concurrent listing operations and speed up listing.
142    pub max_concurrent_lister_per_gc_job: usize,
143    /// Maximum concurrent GC jobs.
144    /// This is used to limit the number of concurrent GC jobs running on the datanode
145    /// to prevent too many concurrent GC jobs from overwhelming the datanode.
146    pub max_concurrent_gc_job: usize,
147}
148
149impl Default for GcConfig {
150    fn default() -> Self {
151        Self {
152            enable: false,
153            // expect long running queries to be finished(or at least be able to notify it's using a deleted file) within a reasonable time
154            lingering_time: Some(Duration::from_secs(60)),
155            // 1 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
156            unknown_file_lingering_time: Duration::from_secs(60 * 60),
157            max_concurrent_lister_per_gc_job: 32,
158            max_concurrent_gc_job: 4,
159        }
160    }
161}
162
163pub struct LocalGcWorker {
164    pub(crate) access_layer: AccessLayerRef,
165    pub(crate) cache_manager: Option<CacheManagerRef>,
166    pub(crate) regions: BTreeMap<RegionId, Option<MitoRegionRef>>,
167    /// Lingering time before deleting files.
168    pub(crate) opt: GcConfig,
169    /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
170    ///
171    /// Also contains manifest versions of regions when the tmp ref files are generated.
172    /// Used to determine whether the tmp ref files are outdated.
173    pub(crate) file_ref_manifest: FileRefsManifest,
174    _permit: OwnedSemaphorePermit,
175    /// Whether to perform full file listing during GC.
176    /// When set to false, GC will only delete files that are tracked in the manifest's removed_files,
177    /// which can significantly improve performance by avoiding expensive list operations.
178    /// When set to true, GC will perform a full listing to find and delete orphan files
179    /// (files not tracked in the manifest).
180    ///
181    /// Set to false for regular GC operations to optimize performance.
182    /// Set to true periodically or when you need to clean up orphan files.
183    pub full_file_listing: bool,
184}
185
186pub struct ManifestOpenConfig {
187    pub compress_manifest: bool,
188    pub manifest_checkpoint_distance: u64,
189    pub experimental_manifest_keep_removed_file_count: usize,
190    pub experimental_manifest_keep_removed_file_ttl: Duration,
191}
192
193impl From<MitoConfig> for ManifestOpenConfig {
194    fn from(mito_config: MitoConfig) -> Self {
195        Self {
196            compress_manifest: mito_config.compress_manifest,
197            manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
198            experimental_manifest_keep_removed_file_count: mito_config
199                .experimental_manifest_keep_removed_file_count,
200            experimental_manifest_keep_removed_file_ttl: mito_config
201                .experimental_manifest_keep_removed_file_ttl,
202        }
203    }
204}
205
206impl LocalGcWorker {
207    /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC.
208    /// The regions are specified by their `RegionId` and should all belong to the same table.
209    ///
210    #[allow(clippy::too_many_arguments)]
211    pub async fn try_new(
212        access_layer: AccessLayerRef,
213        cache_manager: Option<CacheManagerRef>,
214        regions_to_gc: BTreeMap<RegionId, Option<MitoRegionRef>>,
215        opt: GcConfig,
216        file_ref_manifest: FileRefsManifest,
217        limiter: &GcLimiterRef,
218        full_file_listing: bool,
219    ) -> Result<Self> {
220        if let Some(first_region_id) = regions_to_gc.keys().next() {
221            let table_id = first_region_id.table_id();
222            for region_id in regions_to_gc.keys() {
223                ensure!(
224                    region_id.table_id() == table_id,
225                    InvalidRequestSnafu {
226                        region_id: *region_id,
227                        reason: format!(
228                            "Region {} does not belong to table {}",
229                            region_id, table_id
230                        ),
231                    }
232                );
233            }
234        }
235
236        let permit = limiter.permit()?;
237
238        Ok(Self {
239            access_layer,
240            cache_manager,
241            regions: regions_to_gc,
242            opt,
243            file_ref_manifest,
244            _permit: permit,
245            full_file_listing,
246        })
247    }
248
249    /// Get tmp ref files for all current regions
250    pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileRef>>> {
251        let mut tmp_ref_files = HashMap::new();
252        for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
253            tmp_ref_files
254                .entry(*region_id)
255                .or_insert_with(HashSet::new)
256                .extend(file_refs.clone());
257            // no need to include manifest files here, as they are already included in region manifest
258        }
259
260        Ok(tmp_ref_files)
261    }
262
263    /// Run the GC worker in serial mode,
264    /// considering list files could be slow and run multiple regions in parallel
265    /// may cause too many concurrent listing operations.
266    ///
267    /// TODO(discord9): consider instead running in parallel mode
268    pub async fn run(self) -> Result<GcReport> {
269        info!("LocalGcWorker started");
270        let now = std::time::Instant::now();
271
272        let mut deleted_files = HashMap::new();
273        let mut deleted_indexes = HashMap::new();
274        let tmp_ref_files = self.read_tmp_ref_files().await?;
275        for (region_id, region) in &self.regions {
276            let per_region_time = std::time::Instant::now();
277            if region.as_ref().map(|r| r.manifest_ctx.current_state())
278                == Some(RegionRoleState::Follower)
279            {
280                return UnexpectedSnafu {
281                    reason: format!(
282                        "Region {} is in Follower state, should not run GC on follower regions",
283                        region_id
284                    ),
285                }
286                .fail();
287            }
288            let tmp_ref_files = tmp_ref_files
289                .get(region_id)
290                .cloned()
291                .unwrap_or_else(HashSet::new);
292            let files = self
293                .do_region_gc(*region_id, region.clone(), &tmp_ref_files)
294                .await?;
295            let index_files = files
296                .iter()
297                .filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
298                .collect_vec();
299            deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
300            deleted_indexes.insert(*region_id, index_files);
301            debug!(
302                "GC for region {} took {} secs.",
303                region_id,
304                per_region_time.elapsed().as_secs_f32()
305            );
306        }
307        info!(
308            "LocalGcWorker finished after {} secs.",
309            now.elapsed().as_secs_f32()
310        );
311        let report = GcReport {
312            deleted_files,
313            deleted_indexes,
314            need_retry_regions: HashSet::new(),
315        };
316        Ok(report)
317    }
318}
319
320impl LocalGcWorker {
321    /// concurrency of listing files per region.
322    /// This is used to limit the number of concurrent listing operations and speed up listing
323    pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
324
325    /// Perform GC for the region.
326    /// 1. Get all the removed files in delta manifest files and their expel times
327    /// 2. List all files in the region dir concurrently
328    /// 3. Filter out the files that are still in use or may still be kept for a while
329    /// 4. Delete the unused files
330    ///
331    /// Note that the files that are still in use or may still be kept for a while are not deleted
332    /// to avoid deleting files that are still needed.
333    pub async fn do_region_gc(
334        &self,
335        region_id: RegionId,
336        region: Option<MitoRegionRef>,
337        tmp_ref_files: &HashSet<FileRef>,
338    ) -> Result<Vec<RemovedFile>> {
339        debug!(
340            "Doing gc for region {}, {}",
341            region_id,
342            if region.is_some() {
343                "region found"
344            } else {
345                "region not found, might be dropped"
346            }
347        );
348
349        ensure!(
350            region.is_some() || self.full_file_listing,
351            InvalidRequestSnafu {
352                region_id,
353                reason: "region not found and full_file_listing is false; refusing GC without full listing".to_string(),
354            }
355        );
356
357        let manifest = if let Some(region) = &region {
358            let manifest = region.manifest_ctx.manifest().await;
359            // If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use.
360            let file_ref_manifest_version = self
361                .file_ref_manifest
362                .manifest_version
363                .get(&region.region_id())
364                .cloned();
365            if file_ref_manifest_version != Some(manifest.manifest_version) {
366                // should be rare enough(few seconds after leader update manifest version), just skip gc for this region
367                warn!(
368                    "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
369                    file_ref_manifest_version,
370                    region.region_id(),
371                    manifest.manifest_version
372                );
373                return Ok(vec![]);
374            }
375            Some(manifest)
376        } else {
377            None
378        };
379
380        let all_entries = if let Some(manifest) = &manifest
381            && self.full_file_listing
382        {
383            // do the time consuming listing only when full_file_listing is true(and region is open)
384            // and do it first to make sure we have the latest manifest etc.
385            self.list_from_object_store(region_id, manifest.files.len())
386                .await?
387        } else if manifest.is_none() && self.full_file_listing {
388            // if region is already dropped, we have no manifest to refer to,
389            // so only do gc if `full_file_listing` is true, otherwise just skip it
390            // TODO(discord9): is doing one serial listing enough here?
391            self.list_from_object_store(region_id, Self::CONCURRENCY_LIST_PER_FILES)
392                .await?
393        } else {
394            vec![]
395        };
396
397        let recently_removed_files = if let Some(manifest) = &manifest {
398            self.get_removed_files_expel_times(manifest).await?
399        } else {
400            Default::default()
401        };
402
403        if recently_removed_files.is_empty() {
404            // no files to remove, skip
405            debug!("No recently removed files to gc for region {}", region_id);
406        }
407
408        let removed_file_cnt = recently_removed_files
409            .values()
410            .map(|s| s.len())
411            .sum::<usize>();
412
413        let current_files = manifest.as_ref().map(|m| &m.files);
414
415        let in_manifest = if let Some(current_files) = current_files {
416            current_files
417                .iter()
418                .map(|(file_id, meta)| (*file_id, meta.index_version()))
419                .collect::<HashMap<_, _>>()
420        } else {
421            Default::default()
422        };
423
424        let is_region_dropped = region.is_none();
425
426        let in_tmp_ref = tmp_ref_files
427            .iter()
428            .map(|file_ref| (file_ref.file_id, file_ref.index_version))
429            .collect::<HashSet<_>>();
430
431        let deletable_files = self
432            .list_to_be_deleted_files(
433                region_id,
434                is_region_dropped,
435                &in_manifest,
436                &in_tmp_ref,
437                recently_removed_files,
438                all_entries,
439            )
440            .await?;
441
442        let unused_file_cnt = deletable_files.len();
443
444        info!(
445            "gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {:?}",
446            if region.is_none() {
447                "(region dropped)"
448            } else {
449                ""
450            },
451            current_files.map(|c| c.len()).unwrap_or(0),
452            tmp_ref_files.len(),
453            removed_file_cnt,
454            &deletable_files,
455        );
456
457        debug!(
458            "Found {} unused index files to delete for region {}",
459            deletable_files.len(),
460            region_id
461        );
462
463        self.delete_files(region_id, &deletable_files).await?;
464
465        debug!(
466            "Successfully deleted {} unused files for region {}",
467            unused_file_cnt, region_id
468        );
469        if let Some(region) = &region {
470            self.update_manifest_removed_files(region, deletable_files.clone())
471                .await?;
472        }
473
474        Ok(deletable_files)
475    }
476
477    async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
478        let mut index_ids = vec![];
479        let file_pairs = removed_files
480            .iter()
481            .filter_map(|f| match f {
482                RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))),
483                RemovedFile::Index(file_id, index_version) => {
484                    let region_index_id =
485                        RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version);
486                    index_ids.push(region_index_id);
487                    None
488                }
489            })
490            .collect_vec();
491        delete_files(
492            region_id,
493            &file_pairs,
494            true,
495            &self.access_layer,
496            &self.cache_manager,
497        )
498        .await?;
499
500        for index_id in index_ids {
501            delete_index(index_id, &self.access_layer, &self.cache_manager).await?;
502        }
503
504        // FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now
505        GC_DELETE_FILE_CNT.add(removed_files.len() as i64);
506
507        Ok(())
508    }
509
510    /// Update region manifest for clear the actually deleted files
511    async fn update_manifest_removed_files(
512        &self,
513        region: &MitoRegionRef,
514        deleted_files: Vec<RemovedFile>,
515    ) -> Result<()> {
516        let deleted_file_cnt = deleted_files.len();
517        debug!(
518            "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
519            region.region_id()
520        );
521
522        let mut manager = region.manifest_ctx.manifest_manager.write().await;
523        let cnt = deleted_files.len();
524        manager.clear_deleted_files(deleted_files);
525        debug!(
526            "Updated region_id={} region manifest to clear {cnt} deleted files",
527            region.region_id(),
528        );
529
530        Ok(())
531    }
532
533    /// Get all the removed files in delta manifest files and their expel times.
534    /// The expel time is the time when the file is considered as removed.
535    /// Which is the last modified time of delta manifest which contains the remove action.
536    ///
537    pub async fn get_removed_files_expel_times(
538        &self,
539        region_manifest: &Arc<RegionManifest>,
540    ) -> Result<BTreeMap<Timestamp, HashSet<RemovedFile>>> {
541        let mut ret = BTreeMap::new();
542        for files in &region_manifest.removed_files.removed_files {
543            let expel_time = Timestamp::new_millisecond(files.removed_at);
544            let set = ret.entry(expel_time).or_insert_with(HashSet::new);
545            set.extend(files.files.iter().cloned());
546        }
547
548        Ok(ret)
549    }
550
551    /// Create partitioned listers for concurrent file listing based on concurrency level.
552    /// Returns a vector of (lister, end_boundary) pairs for parallel processing.
553    async fn partition_region_files(
554        &self,
555        region_id: RegionId,
556        concurrency: usize,
557    ) -> Result<Vec<(Lister, Option<String>)>> {
558        let region_dir = self.access_layer.build_region_dir(region_id);
559
560        let partitions = gen_partition_from_concurrency(concurrency);
561        let bounds = vec![None]
562            .into_iter()
563            .chain(partitions.iter().map(|p| Some(p.clone())))
564            .chain(vec![None])
565            .collect::<Vec<_>>();
566
567        let mut listers = vec![];
568        for part in bounds.windows(2) {
569            let start = part[0].clone();
570            let end = part[1].clone();
571            let mut lister = self.access_layer.object_store().lister_with(&region_dir);
572            if let Some(s) = start {
573                lister = lister.start_after(&s);
574            }
575
576            let lister = lister.await.context(OpenDalSnafu)?;
577            listers.push((lister, end));
578        }
579
580        Ok(listers)
581    }
582
583    /// List all files in the region directory.
584    /// Returns a vector of all file entries found.
585    /// This might take a long time if there are many files in the region directory.
586    async fn list_from_object_store(
587        &self,
588        region_id: RegionId,
589        file_cnt_hint: usize,
590    ) -> Result<Vec<Entry>> {
591        let start = tokio::time::Instant::now();
592        let concurrency = (file_cnt_hint / Self::CONCURRENCY_LIST_PER_FILES)
593            .max(1)
594            .min(self.opt.max_concurrent_lister_per_gc_job);
595
596        let listers = self.partition_region_files(region_id, concurrency).await?;
597        let lister_cnt = listers.len();
598
599        // Step 2: Concurrently list all files in the region directory
600        let all_entries = self.list_region_files_concurrent(listers).await?;
601        let cnt = all_entries.len();
602        info!(
603            "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
604            start.elapsed().as_secs_f64(),
605            region_id
606        );
607        Ok(all_entries)
608    }
609
610    /// Concurrently list all files in the region directory using the provided listers.
611    /// Returns a vector of all file entries found across all partitions.
612    async fn list_region_files_concurrent(
613        &self,
614        listers: Vec<(Lister, Option<String>)>,
615    ) -> Result<Vec<Entry>> {
616        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
617        let mut handles = vec![];
618
619        for (lister, end) in listers {
620            let tx = tx.clone();
621            let handle = tokio::spawn(async move {
622                let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
623                    Ok(e) => {
624                        if let Some(end) = &end {
625                            // reach end, stop listing
626                            e.name() < end.as_str()
627                        } else {
628                            // no end, take all entries
629                            true
630                        }
631                    }
632                    // entry went wrong, log and skip it
633                    Err(err) => {
634                        warn!("Failed to list entry: {}", err);
635                        true
636                    }
637                });
638                let stream = stream
639                    .filter(|e| {
640                        if let Ok(e) = &e {
641                            // notice that we only care about files, skip dirs
642                            e.metadata().is_file()
643                        } else {
644                            // error entry, take for further logging
645                            true
646                        }
647                    })
648                    .collect::<Vec<_>>()
649                    .await;
650                // ordering of files doesn't matter here, so we can send them directly
651                tx.send(stream).await.expect("Failed to send entries");
652            });
653
654            handles.push(handle);
655        }
656
657        // Wait for all listers to finish
658        for handle in handles {
659            handle.await.context(JoinSnafu)?;
660        }
661
662        drop(tx); // Close the channel to stop receiving
663
664        // Collect all entries from the channel
665        let mut all_entries = vec![];
666        while let Some(stream) = rx.recv().await {
667            all_entries.extend(stream.into_iter().filter_map(Result::ok));
668        }
669
670        Ok(all_entries)
671    }
672
673    #[allow(clippy::too_many_arguments)]
674    fn filter_deletable_files(
675        &self,
676        is_region_dropped: bool,
677        entries: Vec<Entry>,
678        in_manifest: &HashMap<FileId, Option<IndexVersion>>,
679        in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
680        may_linger_files: &HashSet<&RemovedFile>,
681        eligible_for_delete: &HashSet<&RemovedFile>,
682        unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
683    ) -> Vec<RemovedFile> {
684        let mut ready_for_delete = vec![];
685        // all group by file id for easier checking
686        let in_tmp_ref: HashMap<FileId, HashSet<IndexVersion>> =
687            in_tmp_ref
688                .iter()
689                .fold(HashMap::new(), |mut acc, (file, version)| {
690                    let indices = acc.entry(*file).or_default();
691                    if let Some(version) = version {
692                        indices.insert(*version);
693                    }
694                    acc
695                });
696
697        let may_linger_files: HashMap<FileId, HashSet<&RemovedFile>> = may_linger_files
698            .iter()
699            .fold(HashMap::new(), |mut acc, file| {
700                let indices = acc.entry(file.file_id()).or_default();
701                indices.insert(file);
702                acc
703            });
704
705        let eligible_for_delete: HashMap<FileId, HashSet<&RemovedFile>> = eligible_for_delete
706            .iter()
707            .fold(HashMap::new(), |mut acc, file| {
708                let indices = acc.entry(file.file_id()).or_default();
709                indices.insert(file);
710                acc
711            });
712
713        for entry in entries {
714            let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) {
715                Ok((file_id, file_type)) => (file_id, file_type),
716                Err(err) => {
717                    error!(err; "Failed to parse file id from path: {}", entry.name());
718                    // if we can't parse the file id, it means it's not a sst or index file
719                    // shouldn't delete it because we don't know what it is
720                    GC_SKIPPED_UNPARSABLE_FILES.inc();
721                    continue;
722                }
723            };
724
725            let should_delete = match file_type {
726                FileType::Parquet => {
727                    let is_in_manifest = in_manifest.contains_key(&file_id);
728                    let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id);
729                    let is_linger = may_linger_files.contains_key(&file_id);
730                    let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id);
731
732                    should_delete_file(
733                        is_in_manifest,
734                        is_in_tmp_ref,
735                        is_linger,
736                        is_eligible_for_delete,
737                        is_region_dropped,
738                        &entry,
739                        unknown_file_may_linger_until,
740                    )
741                }
742                FileType::Puffin(version) => {
743                    // notice need to check both file id and version
744                    let is_in_manifest = in_manifest
745                        .get(&file_id)
746                        .map(|opt_ver| *opt_ver == Some(version))
747                        .unwrap_or(false);
748                    let is_in_tmp_ref = in_tmp_ref
749                        .get(&file_id)
750                        .map(|versions| versions.contains(&version))
751                        .unwrap_or(false);
752                    let is_linger = may_linger_files
753                        .get(&file_id)
754                        .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
755                        .unwrap_or(false);
756                    let is_eligible_for_delete = eligible_for_delete
757                        .get(&file_id)
758                        .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
759                        .unwrap_or(false);
760
761                    should_delete_file(
762                        is_in_manifest,
763                        is_in_tmp_ref,
764                        is_linger,
765                        is_eligible_for_delete,
766                        is_region_dropped,
767                        &entry,
768                        unknown_file_may_linger_until,
769                    )
770                }
771            };
772
773            if should_delete {
774                let removed_file = match file_type {
775                    FileType::Parquet => {
776                        // notice this cause we don't track index version for parquet files
777                        // since entries comes from listing, we can't get index version from path
778                        RemovedFile::File(file_id, None)
779                    }
780                    FileType::Puffin(version) => {
781                        GC_ORPHANED_INDEX_FILES.inc();
782                        RemovedFile::Index(file_id, version)
783                    }
784                };
785                ready_for_delete.push(removed_file);
786            }
787        }
788        ready_for_delete
789    }
790
791    /// List files to be deleted based on their presence in the manifest, temporary references, and recently removed files.
792    /// Returns a vector of `RemovedFile` that are eligible for deletion.
793    ///
794    /// When `full_file_listing` is false, this method will only delete (subset of) files tracked in
795    /// `recently_removed_files`, which significantly
796    /// improves performance. When `full_file_listing` is true, it read from `all_entries` to find
797    /// and delete orphan files (files not tracked in the manifest).
798    ///
799    pub async fn list_to_be_deleted_files(
800        &self,
801        region_id: RegionId,
802        is_region_dropped: bool,
803        in_manifest: &HashMap<FileId, Option<IndexVersion>>,
804        in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
805        recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
806        all_entries: Vec<Entry>,
807    ) -> Result<Vec<RemovedFile>> {
808        let now = chrono::Utc::now();
809        let may_linger_until = self
810            .opt
811            .lingering_time
812            .map(|lingering_time| {
813                chrono::Duration::from_std(lingering_time)
814                    .with_context(|_| DurationOutOfRangeSnafu {
815                        input: lingering_time,
816                    })
817                    .map(|t| now - t)
818            })
819            .transpose()?;
820
821        let unknown_file_may_linger_until = now
822            - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
823                |_| DurationOutOfRangeSnafu {
824                    input: self.opt.unknown_file_lingering_time,
825                },
826            )?;
827
828        // files that may linger, which means they are not in use but may still be kept for a while
829        let threshold =
830            may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
831        // TODO(discord9): if region is already closed, maybe handle threshold differently?
832        // is consider all files to be recently removed acceptable?
833        let mut recently_removed_files = recently_removed_files;
834        let may_linger_files = match threshold {
835            Some(threshold) => recently_removed_files.split_off(&threshold),
836            None => BTreeMap::new(),
837        };
838        debug!("may_linger_files: {:?}", may_linger_files);
839
840        let all_may_linger_files = may_linger_files.values().flatten().collect::<HashSet<_>>();
841
842        // known files(tracked in removed files field) that are eligible for removal
843        // (passed lingering time)
844        let eligible_for_removal = recently_removed_files
845            .values()
846            .flatten()
847            .collect::<HashSet<_>>();
848
849        // When full_file_listing is false, skip expensive list operations and only delete
850        // files that are tracked in recently_removed_files
851        if !self.full_file_listing {
852            // Only delete files that:
853            // 1. Are in recently_removed_files (tracked in manifest)
854            // 2. Are not in use(in manifest or tmp ref)
855            // 3. Have passed the lingering time
856            let files_to_delete: Vec<RemovedFile> = eligible_for_removal
857                .iter()
858                .filter(|file_id| {
859                    let in_use = match file_id {
860                        RemovedFile::File(file_id, index_version) => {
861                            in_manifest.get(file_id) == Some(index_version)
862                                || in_tmp_ref.contains(&(*file_id, *index_version))
863                        }
864                        RemovedFile::Index(file_id, index_version) => {
865                            in_manifest.get(file_id) == Some(&Some(*index_version))
866                                || in_tmp_ref.contains(&(*file_id, Some(*index_version)))
867                        }
868                    };
869                    !in_use
870                })
871                .map(|&f| f.clone())
872                .collect();
873
874            info!(
875                "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
876                region_id,
877                files_to_delete.len()
878            );
879
880            return Ok(files_to_delete);
881        }
882
883        // Full file listing mode: get the full list of files from object store
884
885        // Step 3: Filter files to determine which ones can be deleted
886        let all_unused_files_ready_for_delete = self.filter_deletable_files(
887            is_region_dropped,
888            all_entries,
889            in_manifest,
890            in_tmp_ref,
891            &all_may_linger_files,
892            &eligible_for_removal,
893            unknown_file_may_linger_until,
894        );
895
896        Ok(all_unused_files_ready_for_delete)
897    }
898}
899
900/// Generate partition prefixes based on concurrency and
901/// assume file names are evenly-distributed uuid string,
902/// to evenly distribute files across partitions.
903/// For example, if concurrency is 2, partition prefixes will be:
904/// ["8"] so it divide uuids into two partitions based on the first character.
905/// If concurrency is 32, partition prefixes will be:
906/// ["08", "10", "18", "20", "28", "30", "38" ..., "f0", "f8"]
907/// if concurrency is 1, it returns an empty vector.
908///
909fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
910    let n = concurrency.next_power_of_two();
911    if n <= 1 {
912        return vec![];
913    }
914
915    // `d` is the number of hex characters required to build the partition key.
916    // `p` is the total number of possible values for a key of length `d`.
917    // We need to find the smallest `d` such that 16^d >= n.
918    let mut d = 0;
919    let mut p: u128 = 1;
920    while p < n as u128 {
921        p *= 16;
922        d += 1;
923    }
924
925    let total_space = p;
926    let step = total_space / n as u128;
927
928    (1..n)
929        .map(|i| {
930            let boundary = i as u128 * step;
931            format!("{:0width$x}", boundary, width = d)
932        })
933        .collect()
934}
935
936#[cfg(test)]
937mod tests {
938    use super::*;
939
940    #[test]
941    fn test_gen_partition_from_concurrency() {
942        let partitions = gen_partition_from_concurrency(1);
943        assert!(partitions.is_empty());
944
945        let partitions = gen_partition_from_concurrency(2);
946        assert_eq!(partitions, vec!["8"]);
947
948        let partitions = gen_partition_from_concurrency(3);
949        assert_eq!(partitions, vec!["4", "8", "c"]);
950
951        let partitions = gen_partition_from_concurrency(4);
952        assert_eq!(partitions, vec!["4", "8", "c"]);
953
954        let partitions = gen_partition_from_concurrency(8);
955        assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
956
957        let partitions = gen_partition_from_concurrency(16);
958        assert_eq!(
959            partitions,
960            vec![
961                "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
962            ]
963        );
964
965        let partitions = gen_partition_from_concurrency(32);
966        assert_eq!(
967            partitions,
968            [
969                "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
970                "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
971                "e8", "f0", "f8",
972            ]
973        );
974    }
975}