mito2/
gc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! GC worker which periodically checks and removes unused/obsolete  SST files.
16//!
17//! `expel time`: the time when the file is considered as removed, as in removed from the manifest.
18//! `lingering time`: the time duration before deleting files after they are removed from manifest.
19//! `delta manifest`: the manifest files after the last checkpoint that contains the changes to the manifest.
20//! `delete time`: the time when the file is actually deleted from the object store.
21//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint.
22//!
23
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::{debug, error, info, warn};
30use common_time::Timestamp;
31use itertools::Itertools;
32use object_store::{Entry, Lister};
33use serde::{Deserialize, Serialize};
34use snafu::ResultExt as _;
35use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
36use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
37use tokio_stream::StreamExt;
38
39use crate::access_layer::AccessLayerRef;
40use crate::cache::CacheManagerRef;
41use crate::cache::file_cache::FileType;
42use crate::config::MitoConfig;
43use crate::error::{
44    DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu,
45};
46use crate::manifest::action::{RegionManifest, RemovedFile};
47use crate::metrics::{GC_DELETE_FILE_CNT, GC_ORPHANED_INDEX_FILES, GC_SKIPPED_UNPARSABLE_FILES};
48use crate::region::{MitoRegionRef, RegionRoleState};
49use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_index};
50use crate::sst::location::{self};
51
52#[cfg(test)]
53mod worker_test;
54
55/// Helper function to determine if a file should be deleted based on common logic
56/// shared between Parquet and Puffin file types.
57fn should_delete_file(
58    is_in_manifest: bool,
59    is_in_tmp_ref: bool,
60    is_linger: bool,
61    is_eligible_for_delete: bool,
62    entry: &Entry,
63    unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
64) -> bool {
65    let is_known = is_linger || is_eligible_for_delete;
66
67    let is_unknown_linger_time_exceeded = || {
68        // if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while
69        // using it's last modified time
70        // notice unknown files use a different lingering time
71        entry
72            .metadata()
73            .last_modified()
74            .map(|t| t < unknown_file_may_linger_until)
75            .unwrap_or(false)
76    };
77
78    !is_in_manifest
79        && !is_in_tmp_ref
80        && if is_known {
81            is_eligible_for_delete
82        } else {
83            is_unknown_linger_time_exceeded()
84        }
85}
86
87/// Limit the amount of concurrent GC jobs on the datanode
88pub struct GcLimiter {
89    pub gc_job_limit: Arc<tokio::sync::Semaphore>,
90    gc_concurrency: usize,
91}
92
93pub type GcLimiterRef = Arc<GcLimiter>;
94
95impl GcLimiter {
96    pub fn new(gc_concurrency: usize) -> Self {
97        Self {
98            gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
99            gc_concurrency,
100        }
101    }
102
103    pub fn running_gc_tasks(&self) -> u32 {
104        (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
105    }
106
107    pub fn gc_concurrency(&self) -> u32 {
108        self.gc_concurrency as u32
109    }
110
111    pub fn gc_stat(&self) -> GcStat {
112        GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
113    }
114
115    /// Try to acquire a permit for a GC job.
116    ///
117    /// If no permit is available, returns an `TooManyGcJobs` error.
118    pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
119        self.gc_job_limit
120            .clone()
121            .try_acquire_owned()
122            .map_err(|e| match e {
123                TryAcquireError::Closed => UnexpectedSnafu {
124                    reason: format!("Failed to acquire gc permit: {e}"),
125                }
126                .build(),
127                TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
128            })
129    }
130}
131
132#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
133#[serde(default)]
134pub struct GcConfig {
135    /// Whether GC is enabled.
136    pub enable: bool,
137    /// Lingering time before deleting files.
138    /// Should be long enough to allow long running queries to finish.
139    /// If set to None, then unused files will be deleted immediately.
140    ///
141    /// TODO(discord9): long running queries should actively write tmp manifest files
142    /// to prevent deletion of files they are using.
143    #[serde(with = "humantime_serde")]
144    pub lingering_time: Option<Duration>,
145    /// Lingering time before deleting unknown files(files with undetermine expel time).
146    /// expel time is the time when the file is considered as removed, as in removed from the manifest.
147    /// This should only occur rarely, as manifest keep tracks in `removed_files` field
148    /// unless something goes wrong.
149    #[serde(with = "humantime_serde")]
150    pub unknown_file_lingering_time: Duration,
151    /// Maximum concurrent list operations per GC job.
152    /// This is used to limit the number of concurrent listing operations and speed up listing.
153    pub max_concurrent_lister_per_gc_job: usize,
154    /// Maximum concurrent GC jobs.
155    /// This is used to limit the number of concurrent GC jobs running on the datanode
156    /// to prevent too many concurrent GC jobs from overwhelming the datanode.
157    pub max_concurrent_gc_job: usize,
158}
159
160impl Default for GcConfig {
161    fn default() -> Self {
162        Self {
163            enable: false,
164            // expect long running queries to be finished(or at least be able to notify it's using a deleted file) within a reasonable time
165            lingering_time: Some(Duration::from_secs(60)),
166            // 1 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
167            unknown_file_lingering_time: Duration::from_secs(60 * 60),
168            max_concurrent_lister_per_gc_job: 32,
169            max_concurrent_gc_job: 4,
170        }
171    }
172}
173
174pub struct LocalGcWorker {
175    pub(crate) access_layer: AccessLayerRef,
176    pub(crate) cache_manager: Option<CacheManagerRef>,
177    pub(crate) regions: BTreeMap<RegionId, MitoRegionRef>,
178    /// Lingering time before deleting files.
179    pub(crate) opt: GcConfig,
180    /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
181    ///
182    /// Also contains manifest versions of regions when the tmp ref files are generated.
183    /// Used to determine whether the tmp ref files are outdated.
184    pub(crate) file_ref_manifest: FileRefsManifest,
185    _permit: OwnedSemaphorePermit,
186    /// Whether to perform full file listing during GC.
187    /// When set to false, GC will only delete files that are tracked in the manifest's removed_files,
188    /// which can significantly improve performance by avoiding expensive list operations.
189    /// When set to true, GC will perform a full listing to find and delete orphan files
190    /// (files not tracked in the manifest).
191    ///
192    /// Set to false for regular GC operations to optimize performance.
193    /// Set to true periodically or when you need to clean up orphan files.
194    pub full_file_listing: bool,
195}
196
197pub struct ManifestOpenConfig {
198    pub compress_manifest: bool,
199    pub manifest_checkpoint_distance: u64,
200    pub experimental_manifest_keep_removed_file_count: usize,
201    pub experimental_manifest_keep_removed_file_ttl: Duration,
202}
203
204impl From<MitoConfig> for ManifestOpenConfig {
205    fn from(mito_config: MitoConfig) -> Self {
206        Self {
207            compress_manifest: mito_config.compress_manifest,
208            manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
209            experimental_manifest_keep_removed_file_count: mito_config
210                .experimental_manifest_keep_removed_file_count,
211            experimental_manifest_keep_removed_file_ttl: mito_config
212                .experimental_manifest_keep_removed_file_ttl,
213        }
214    }
215}
216
217impl LocalGcWorker {
218    /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC.
219    /// The regions are specified by their `RegionId` and should all belong to the same table.
220    ///
221    #[allow(clippy::too_many_arguments)]
222    pub async fn try_new(
223        access_layer: AccessLayerRef,
224        cache_manager: Option<CacheManagerRef>,
225        regions_to_gc: BTreeMap<RegionId, MitoRegionRef>,
226        opt: GcConfig,
227        file_ref_manifest: FileRefsManifest,
228        limiter: &GcLimiterRef,
229        full_file_listing: bool,
230    ) -> Result<Self> {
231        let permit = limiter.permit()?;
232
233        Ok(Self {
234            access_layer,
235            cache_manager,
236            regions: regions_to_gc,
237            opt,
238            file_ref_manifest,
239            _permit: permit,
240            full_file_listing,
241        })
242    }
243
244    /// Get tmp ref files for all current regions
245    pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileRef>>> {
246        let mut tmp_ref_files = HashMap::new();
247        for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
248            tmp_ref_files
249                .entry(*region_id)
250                .or_insert_with(HashSet::new)
251                .extend(file_refs.clone());
252            // no need to include manifest files here, as they are already included in region manifest
253        }
254
255        Ok(tmp_ref_files)
256    }
257
258    /// Run the GC worker in serial mode,
259    /// considering list files could be slow and run multiple regions in parallel
260    /// may cause too many concurrent listing operations.
261    ///
262    /// TODO(discord9): consider instead running in parallel mode
263    pub async fn run(self) -> Result<GcReport> {
264        info!("LocalGcWorker started");
265        let now = std::time::Instant::now();
266
267        let mut deleted_files = HashMap::new();
268        let mut deleted_indexes = HashMap::new();
269        let tmp_ref_files = self.read_tmp_ref_files().await?;
270        for (region_id, region) in &self.regions {
271            let per_region_time = std::time::Instant::now();
272            if region.manifest_ctx.current_state() == RegionRoleState::Follower {
273                return UnexpectedSnafu {
274                    reason: format!(
275                        "Region {} is in Follower state, should not run GC on follower regions",
276                        region_id
277                    ),
278                }
279                .fail();
280            }
281            let tmp_ref_files = tmp_ref_files
282                .get(region_id)
283                .cloned()
284                .unwrap_or_else(HashSet::new);
285            let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?;
286            let index_files = files
287                .iter()
288                .filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
289                .collect_vec();
290            deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
291            deleted_indexes.insert(*region_id, index_files);
292            debug!(
293                "GC for region {} took {} secs.",
294                region_id,
295                per_region_time.elapsed().as_secs_f32()
296            );
297        }
298        info!(
299            "LocalGcWorker finished after {} secs.",
300            now.elapsed().as_secs_f32()
301        );
302        let report = GcReport {
303            deleted_files,
304            deleted_indexes,
305            need_retry_regions: HashSet::new(),
306        };
307        Ok(report)
308    }
309}
310
311impl LocalGcWorker {
312    /// concurrency of listing files per region.
313    /// This is used to limit the number of concurrent listing operations and speed up listing
314    pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
315
316    /// Perform GC for the region.
317    /// 1. Get all the removed files in delta manifest files and their expel times
318    /// 2. List all files in the region dir concurrently
319    /// 3. Filter out the files that are still in use or may still be kept for a while
320    /// 4. Delete the unused files
321    ///
322    /// Note that the files that are still in use or may still be kept for a while are not deleted
323    /// to avoid deleting files that are still needed.
324    pub async fn do_region_gc(
325        &self,
326        region: MitoRegionRef,
327        tmp_ref_files: &HashSet<FileRef>,
328    ) -> Result<Vec<RemovedFile>> {
329        let region_id = region.region_id();
330
331        debug!("Doing gc for region {}", region_id);
332
333        let manifest = region.manifest_ctx.manifest().await;
334        // If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use.
335        let file_ref_manifest_version = self
336            .file_ref_manifest
337            .manifest_version
338            .get(&region.region_id())
339            .cloned();
340        if file_ref_manifest_version != Some(manifest.manifest_version) {
341            // should be rare enough(few seconds after leader update manifest version), just skip gc for this region
342            warn!(
343                "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
344                file_ref_manifest_version,
345                region.region_id(),
346                manifest.manifest_version
347            );
348            return Ok(vec![]);
349        }
350
351        // do the time consuming listing only when full_file_listing is true
352        // and do it first to make sure we have the latest manifest etc.
353        let all_entries = if self.full_file_listing {
354            self.list_from_object_store(region.region_id(), manifest.clone())
355                .await?
356        } else {
357            vec![]
358        };
359
360        let region_id = manifest.metadata.region_id;
361        let current_files = &manifest.files;
362
363        let recently_removed_files = self.get_removed_files_expel_times(&manifest).await?;
364
365        if recently_removed_files.is_empty() {
366            // no files to remove, skip
367            debug!("No recently removed files to gc for region {}", region_id);
368        }
369
370        let removed_file_cnt = recently_removed_files
371            .values()
372            .map(|s| s.len())
373            .sum::<usize>();
374
375        let in_manifest = current_files
376            .iter()
377            .map(|(file_id, meta)| (*file_id, meta.index_version()))
378            .collect::<HashMap<_, _>>();
379
380        let in_tmp_ref = tmp_ref_files
381            .iter()
382            .map(|file_ref| (file_ref.file_id, file_ref.index_version))
383            .collect::<HashSet<_>>();
384
385        let deletable_files = self
386            .list_to_be_deleted_files(
387                region_id,
388                &in_manifest,
389                &in_tmp_ref,
390                recently_removed_files,
391                all_entries,
392            )
393            .await?;
394
395        let unused_file_cnt = deletable_files.len();
396
397        debug!(
398            "gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {} ",
399            current_files.len(),
400            tmp_ref_files.len(),
401            removed_file_cnt,
402            deletable_files.len()
403        );
404
405        debug!(
406            "Found {} unused index files to delete for region {}",
407            deletable_files.len(),
408            region_id
409        );
410
411        self.delete_files(region_id, &deletable_files).await?;
412
413        debug!(
414            "Successfully deleted {} unused files for region {}",
415            unused_file_cnt, region_id
416        );
417        self.update_manifest_removed_files(&region, deletable_files.clone())
418            .await?;
419
420        Ok(deletable_files)
421    }
422
423    async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
424        let mut index_ids = vec![];
425        let file_pairs = removed_files
426            .iter()
427            .filter_map(|f| match f {
428                RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))),
429                RemovedFile::Index(file_id, index_version) => {
430                    let region_index_id =
431                        RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version);
432                    index_ids.push(region_index_id);
433                    None
434                }
435            })
436            .collect_vec();
437        delete_files(
438            region_id,
439            &file_pairs,
440            true,
441            &self.access_layer,
442            &self.cache_manager,
443        )
444        .await?;
445
446        for index_id in index_ids {
447            delete_index(index_id, &self.access_layer, &self.cache_manager).await?;
448        }
449
450        // FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now
451        GC_DELETE_FILE_CNT.add(removed_files.len() as i64);
452
453        Ok(())
454    }
455
456    /// Update region manifest for clear the actually deleted files
457    async fn update_manifest_removed_files(
458        &self,
459        region: &MitoRegionRef,
460        deleted_files: Vec<RemovedFile>,
461    ) -> Result<()> {
462        let deleted_file_cnt = deleted_files.len();
463        debug!(
464            "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
465            region.region_id()
466        );
467
468        let mut manager = region.manifest_ctx.manifest_manager.write().await;
469        let cnt = deleted_files.len();
470        manager.clear_deleted_files(deleted_files);
471        debug!(
472            "Updated region_id={} region manifest to clear {cnt} deleted files",
473            region.region_id(),
474        );
475
476        Ok(())
477    }
478
479    /// Get all the removed files in delta manifest files and their expel times.
480    /// The expel time is the time when the file is considered as removed.
481    /// Which is the last modified time of delta manifest which contains the remove action.
482    ///
483    pub async fn get_removed_files_expel_times(
484        &self,
485        region_manifest: &Arc<RegionManifest>,
486    ) -> Result<BTreeMap<Timestamp, HashSet<RemovedFile>>> {
487        let mut ret = BTreeMap::new();
488        for files in &region_manifest.removed_files.removed_files {
489            let expel_time = Timestamp::new_millisecond(files.removed_at);
490            let set = ret.entry(expel_time).or_insert_with(HashSet::new);
491            set.extend(files.files.iter().cloned());
492        }
493
494        Ok(ret)
495    }
496
497    /// Create partitioned listers for concurrent file listing based on concurrency level.
498    /// Returns a vector of (lister, end_boundary) pairs for parallel processing.
499    async fn partition_region_files(
500        &self,
501        region_id: RegionId,
502        concurrency: usize,
503    ) -> Result<Vec<(Lister, Option<String>)>> {
504        let region_dir = self.access_layer.build_region_dir(region_id);
505
506        let partitions = gen_partition_from_concurrency(concurrency);
507        let bounds = vec![None]
508            .into_iter()
509            .chain(partitions.iter().map(|p| Some(p.clone())))
510            .chain(vec![None])
511            .collect::<Vec<_>>();
512
513        let mut listers = vec![];
514        for part in bounds.windows(2) {
515            let start = part[0].clone();
516            let end = part[1].clone();
517            let mut lister = self.access_layer.object_store().lister_with(&region_dir);
518            if let Some(s) = start {
519                lister = lister.start_after(&s);
520            }
521
522            let lister = lister.await.context(OpenDalSnafu)?;
523            listers.push((lister, end));
524        }
525
526        Ok(listers)
527    }
528
529    /// List all files in the region directory.
530    /// Returns a vector of all file entries found.
531    /// This might take a long time if there are many files in the region directory.
532    async fn list_from_object_store(
533        &self,
534        region_id: RegionId,
535        manifest: Arc<RegionManifest>,
536    ) -> Result<Vec<Entry>> {
537        let start = tokio::time::Instant::now();
538        let current_files = &manifest.files;
539        let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
540            .max(1)
541            .min(self.opt.max_concurrent_lister_per_gc_job);
542
543        let listers = self.partition_region_files(region_id, concurrency).await?;
544        let lister_cnt = listers.len();
545
546        // Step 2: Concurrently list all files in the region directory
547        let all_entries = self.list_region_files_concurrent(listers).await?;
548        let cnt = all_entries.len();
549        info!(
550            "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
551            start.elapsed().as_secs_f64(),
552            region_id
553        );
554        Ok(all_entries)
555    }
556
557    /// Concurrently list all files in the region directory using the provided listers.
558    /// Returns a vector of all file entries found across all partitions.
559    async fn list_region_files_concurrent(
560        &self,
561        listers: Vec<(Lister, Option<String>)>,
562    ) -> Result<Vec<Entry>> {
563        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
564        let mut handles = vec![];
565
566        for (lister, end) in listers {
567            let tx = tx.clone();
568            let handle = tokio::spawn(async move {
569                let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
570                    Ok(e) => {
571                        if let Some(end) = &end {
572                            // reach end, stop listing
573                            e.name() < end.as_str()
574                        } else {
575                            // no end, take all entries
576                            true
577                        }
578                    }
579                    // entry went wrong, log and skip it
580                    Err(err) => {
581                        warn!("Failed to list entry: {}", err);
582                        true
583                    }
584                });
585                let stream = stream
586                    .filter(|e| {
587                        if let Ok(e) = &e {
588                            // notice that we only care about files, skip dirs
589                            e.metadata().is_file()
590                        } else {
591                            // error entry, take for further logging
592                            true
593                        }
594                    })
595                    .collect::<Vec<_>>()
596                    .await;
597                // ordering of files doesn't matter here, so we can send them directly
598                tx.send(stream).await.expect("Failed to send entries");
599            });
600
601            handles.push(handle);
602        }
603
604        // Wait for all listers to finish
605        for handle in handles {
606            handle.await.context(JoinSnafu)?;
607        }
608
609        drop(tx); // Close the channel to stop receiving
610
611        // Collect all entries from the channel
612        let mut all_entries = vec![];
613        while let Some(stream) = rx.recv().await {
614            all_entries.extend(stream.into_iter().filter_map(Result::ok));
615        }
616
617        Ok(all_entries)
618    }
619
620    fn filter_deletable_files(
621        &self,
622        entries: Vec<Entry>,
623        in_manifest: &HashMap<FileId, Option<IndexVersion>>,
624        in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
625        may_linger_files: &HashSet<&RemovedFile>,
626        eligible_for_delete: &HashSet<&RemovedFile>,
627        unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
628    ) -> Vec<RemovedFile> {
629        let mut ready_for_delete = vec![];
630        // all group by file id for easier checking
631        let in_tmp_ref: HashMap<FileId, HashSet<IndexVersion>> =
632            in_tmp_ref
633                .iter()
634                .fold(HashMap::new(), |mut acc, (file, version)| {
635                    let indices = acc.entry(*file).or_default();
636                    if let Some(version) = version {
637                        indices.insert(*version);
638                    }
639                    acc
640                });
641
642        let may_linger_files: HashMap<FileId, HashSet<&RemovedFile>> = may_linger_files
643            .iter()
644            .fold(HashMap::new(), |mut acc, file| {
645                let indices = acc.entry(file.file_id()).or_default();
646                indices.insert(file);
647                acc
648            });
649
650        let eligible_for_delete: HashMap<FileId, HashSet<&RemovedFile>> = eligible_for_delete
651            .iter()
652            .fold(HashMap::new(), |mut acc, file| {
653                let indices = acc.entry(file.file_id()).or_default();
654                indices.insert(file);
655                acc
656            });
657
658        for entry in entries {
659            let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) {
660                Ok((file_id, file_type)) => (file_id, file_type),
661                Err(err) => {
662                    error!(err; "Failed to parse file id from path: {}", entry.name());
663                    // if we can't parse the file id, it means it's not a sst or index file
664                    // shouldn't delete it because we don't know what it is
665                    GC_SKIPPED_UNPARSABLE_FILES.inc();
666                    continue;
667                }
668            };
669
670            let should_delete = match file_type {
671                FileType::Parquet => {
672                    let is_in_manifest = in_manifest.contains_key(&file_id);
673                    let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id);
674                    let is_linger = may_linger_files.contains_key(&file_id);
675                    let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id);
676
677                    should_delete_file(
678                        is_in_manifest,
679                        is_in_tmp_ref,
680                        is_linger,
681                        is_eligible_for_delete,
682                        &entry,
683                        unknown_file_may_linger_until,
684                    )
685                }
686                FileType::Puffin(version) => {
687                    // notice need to check both file id and version
688                    let is_in_manifest = in_manifest
689                        .get(&file_id)
690                        .map(|opt_ver| *opt_ver == Some(version))
691                        .unwrap_or(false);
692                    let is_in_tmp_ref = in_tmp_ref
693                        .get(&file_id)
694                        .map(|versions| versions.contains(&version))
695                        .unwrap_or(false);
696                    let is_linger = may_linger_files
697                        .get(&file_id)
698                        .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
699                        .unwrap_or(false);
700                    let is_eligible_for_delete = eligible_for_delete
701                        .get(&file_id)
702                        .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
703                        .unwrap_or(false);
704
705                    should_delete_file(
706                        is_in_manifest,
707                        is_in_tmp_ref,
708                        is_linger,
709                        is_eligible_for_delete,
710                        &entry,
711                        unknown_file_may_linger_until,
712                    )
713                }
714            };
715
716            if should_delete {
717                let removed_file = match file_type {
718                    FileType::Parquet => {
719                        // notice this cause we don't track index version for parquet files
720                        // since entries comes from listing, we can't get index version from path
721                        RemovedFile::File(file_id, None)
722                    }
723                    FileType::Puffin(version) => {
724                        GC_ORPHANED_INDEX_FILES.inc();
725                        RemovedFile::Index(file_id, version)
726                    }
727                };
728                ready_for_delete.push(removed_file);
729            }
730        }
731        ready_for_delete
732    }
733
734    /// List files to be deleted based on their presence in the manifest, temporary references, and recently removed files.
735    /// Returns a vector of `RemovedFile` that are eligible for deletion.
736    ///
737    /// When `full_file_listing` is false, this method will only delete (subset of) files tracked in
738    /// `recently_removed_files`, which significantly
739    /// improves performance. When `full_file_listing` is true, it read from `all_entries` to find
740    /// and delete orphan files (files not tracked in the manifest).
741    ///
742    pub async fn list_to_be_deleted_files(
743        &self,
744        region_id: RegionId,
745        in_manifest: &HashMap<FileId, Option<IndexVersion>>,
746        in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
747        recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
748        all_entries: Vec<Entry>,
749    ) -> Result<Vec<RemovedFile>> {
750        let now = chrono::Utc::now();
751        let may_linger_until = self
752            .opt
753            .lingering_time
754            .map(|lingering_time| {
755                chrono::Duration::from_std(lingering_time)
756                    .with_context(|_| DurationOutOfRangeSnafu {
757                        input: lingering_time,
758                    })
759                    .map(|t| now - t)
760            })
761            .transpose()?;
762
763        let unknown_file_may_linger_until = now
764            - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
765                |_| DurationOutOfRangeSnafu {
766                    input: self.opt.unknown_file_lingering_time,
767                },
768            )?;
769
770        // files that may linger, which means they are not in use but may still be kept for a while
771        let threshold =
772            may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
773        let mut recently_removed_files = recently_removed_files;
774        let may_linger_files = match threshold {
775            Some(threshold) => recently_removed_files.split_off(&threshold),
776            None => BTreeMap::new(),
777        };
778        debug!("may_linger_files: {:?}", may_linger_files);
779
780        let all_may_linger_files = may_linger_files.values().flatten().collect::<HashSet<_>>();
781
782        // known files(tracked in removed files field) that are eligible for removal
783        // (passed lingering time)
784        let eligible_for_removal = recently_removed_files
785            .values()
786            .flatten()
787            .collect::<HashSet<_>>();
788
789        // When full_file_listing is false, skip expensive list operations and only delete
790        // files that are tracked in recently_removed_files
791        if !self.full_file_listing {
792            // Only delete files that:
793            // 1. Are in recently_removed_files (tracked in manifest)
794            // 2. Are not in use(in manifest or tmp ref)
795            // 3. Have passed the lingering time
796            let files_to_delete: Vec<RemovedFile> = eligible_for_removal
797                .iter()
798                .filter(|file_id| {
799                    let in_use = match file_id {
800                        RemovedFile::File(file_id, index_version) => {
801                            in_manifest.get(file_id) == Some(index_version)
802                                || in_tmp_ref.contains(&(*file_id, *index_version))
803                        }
804                        RemovedFile::Index(file_id, index_version) => {
805                            in_manifest.get(file_id) == Some(&Some(*index_version))
806                                || in_tmp_ref.contains(&(*file_id, Some(*index_version)))
807                        }
808                    };
809                    !in_use
810                })
811                .map(|&f| f.clone())
812                .collect();
813
814            info!(
815                "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
816                region_id,
817                files_to_delete.len()
818            );
819
820            return Ok(files_to_delete);
821        }
822
823        // Full file listing mode: get the full list of files from object store
824
825        // Step 3: Filter files to determine which ones can be deleted
826        let all_unused_files_ready_for_delete = self.filter_deletable_files(
827            all_entries,
828            in_manifest,
829            in_tmp_ref,
830            &all_may_linger_files,
831            &eligible_for_removal,
832            unknown_file_may_linger_until,
833        );
834
835        Ok(all_unused_files_ready_for_delete)
836    }
837}
838
839/// Generate partition prefixes based on concurrency and
840/// assume file names are evenly-distributed uuid string,
841/// to evenly distribute files across partitions.
842/// For example, if concurrency is 2, partition prefixes will be:
843/// ["8"] so it divide uuids into two partitions based on the first character.
844/// If concurrency is 32, partition prefixes will be:
845/// ["08", "10", "18", "20", "28", "30", "38" ..., "f0", "f8"]
846/// if concurrency is 1, it returns an empty vector.
847///
848fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
849    let n = concurrency.next_power_of_two();
850    if n <= 1 {
851        return vec![];
852    }
853
854    // `d` is the number of hex characters required to build the partition key.
855    // `p` is the total number of possible values for a key of length `d`.
856    // We need to find the smallest `d` such that 16^d >= n.
857    let mut d = 0;
858    let mut p: u128 = 1;
859    while p < n as u128 {
860        p *= 16;
861        d += 1;
862    }
863
864    let total_space = p;
865    let step = total_space / n as u128;
866
867    (1..n)
868        .map(|i| {
869            let boundary = i as u128 * step;
870            format!("{:0width$x}", boundary, width = d)
871        })
872        .collect()
873}
874
875#[cfg(test)]
876mod tests {
877    use super::*;
878
879    #[test]
880    fn test_gen_partition_from_concurrency() {
881        let partitions = gen_partition_from_concurrency(1);
882        assert!(partitions.is_empty());
883
884        let partitions = gen_partition_from_concurrency(2);
885        assert_eq!(partitions, vec!["8"]);
886
887        let partitions = gen_partition_from_concurrency(3);
888        assert_eq!(partitions, vec!["4", "8", "c"]);
889
890        let partitions = gen_partition_from_concurrency(4);
891        assert_eq!(partitions, vec!["4", "8", "c"]);
892
893        let partitions = gen_partition_from_concurrency(8);
894        assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
895
896        let partitions = gen_partition_from_concurrency(16);
897        assert_eq!(
898            partitions,
899            vec![
900                "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
901            ]
902        );
903
904        let partitions = gen_partition_from_concurrency(32);
905        assert_eq!(
906            partitions,
907            [
908                "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
909                "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
910                "e8", "f0", "f8",
911            ]
912        );
913    }
914}