Skip to main content

mito2/
gc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! GC worker which periodically checks and removes unused/obsolete  SST files.
16//!
17//! `expel time`: the time when the file is considered as removed, as in removed from the manifest.
18//! `lingering time`: the time duration before deleting files after they are removed from manifest.
19//! `delta manifest`: the manifest files after the last checkpoint that contains the changes to the manifest.
20//! `delete time`: the time when the file is actually deleted from the object store.
21//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint.
22//!
23
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::tracing::Instrument as _;
30use common_telemetry::{debug, error, info, warn};
31use common_time::Timestamp;
32use itertools::Itertools;
33use object_store::{Entry, ErrorKind, Lister};
34use serde::{Deserialize, Serialize};
35use snafu::{ResultExt as _, ensure};
36use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
37use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
38use tokio_stream::StreamExt;
39
40use crate::access_layer::AccessLayerRef;
41use crate::cache::CacheManagerRef;
42use crate::cache::file_cache::FileType;
43use crate::config::MitoConfig;
44use crate::error::{
45    DurationOutOfRangeSnafu, InvalidRequestSnafu, JoinSnafu, OpenDalSnafu, Result,
46    TooManyGcJobsSnafu, UnexpectedSnafu,
47};
48use crate::manifest::action::{RegionManifest, RemovedFile};
49use crate::metrics::{
50    GC_DELETE_FILE_CNT, GC_DURATION_SECONDS, GC_ERRORS_TOTAL, GC_FILES_DELETED_TOTAL,
51    GC_ORPHANED_INDEX_FILES, GC_RUNS_TOTAL, GC_SKIPPED_UNPARSABLE_FILES,
52};
53use crate::region::{MitoRegionRef, RegionRoleState};
54use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_indexes};
55use crate::sst::location::{self};
56
57#[cfg(test)]
58mod worker_test;
59
60/// Helper function to determine if a file should be deleted based on common logic
61/// shared between Parquet and Puffin file types.
62fn should_delete_file(
63    is_in_manifest: bool,
64    is_in_tmp_ref: bool,
65    is_linger: bool,
66    is_eligible_for_delete: bool,
67    is_region_dropped: bool,
68    _entry: &Entry,
69    _unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
70) -> bool {
71    let is_known = is_linger || is_eligible_for_delete;
72
73    !is_in_manifest
74        && !is_in_tmp_ref
75        && if is_known {
76            is_eligible_for_delete
77        } else {
78            !is_in_tmp_ref && is_region_dropped
79        }
80}
81
82/// Limit the amount of concurrent GC jobs on the datanode
83pub struct GcLimiter {
84    pub gc_job_limit: Arc<tokio::sync::Semaphore>,
85    gc_concurrency: usize,
86}
87
88pub type GcLimiterRef = Arc<GcLimiter>;
89
90impl GcLimiter {
91    pub fn new(gc_concurrency: usize) -> Self {
92        Self {
93            gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
94            gc_concurrency,
95        }
96    }
97
98    pub fn running_gc_tasks(&self) -> u32 {
99        (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
100    }
101
102    pub fn gc_concurrency(&self) -> u32 {
103        self.gc_concurrency as u32
104    }
105
106    pub fn gc_stat(&self) -> GcStat {
107        GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
108    }
109
110    /// Try to acquire a permit for a GC job.
111    ///
112    /// If no permit is available, returns an `TooManyGcJobs` error.
113    pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
114        self.gc_job_limit
115            .clone()
116            .try_acquire_owned()
117            .map_err(|e| match e {
118                TryAcquireError::Closed => UnexpectedSnafu {
119                    reason: format!("Failed to acquire gc permit: {e}"),
120                }
121                .build(),
122                TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
123            })
124    }
125}
126
127#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
128#[serde(default)]
129pub struct GcConfig {
130    /// Whether GC is enabled.
131    pub enable: bool,
132    /// Lingering time before deleting files.
133    /// Should be long enough to allow long running queries to finish.
134    /// If set to None, then unused files will be deleted immediately.
135    ///
136    #[serde(with = "humantime_serde")]
137    pub lingering_time: Option<Duration>,
138    /// Lingering time before deleting unknown files(files with undetermine expel time).
139    /// expel time is the time when the file is considered as removed, as in removed from the manifest.
140    /// This should only occur rarely, as manifest keep tracks in `removed_files` field
141    /// unless something goes wrong.
142    #[serde(with = "humantime_serde")]
143    pub unknown_file_lingering_time: Duration,
144    /// Maximum concurrent list operations per GC job.
145    /// This is used to limit the number of concurrent listing operations and speed up listing.
146    pub max_concurrent_lister_per_gc_job: usize,
147    /// Maximum concurrent GC jobs.
148    /// This is used to limit the number of concurrent GC jobs running on the datanode
149    /// to prevent too many concurrent GC jobs from overwhelming the datanode.
150    pub max_concurrent_gc_job: usize,
151}
152
153impl Default for GcConfig {
154    fn default() -> Self {
155        Self {
156            enable: false,
157            // expect long running queries to be finished(or at least be able to notify it's using a deleted file) within a reasonable time
158            lingering_time: Some(Duration::from_secs(60)),
159            // 1 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
160            unknown_file_lingering_time: Duration::from_secs(60 * 60),
161            max_concurrent_lister_per_gc_job: 32,
162            max_concurrent_gc_job: 4,
163        }
164    }
165}
166
167pub struct LocalGcWorker {
168    pub(crate) access_layer: AccessLayerRef,
169    pub(crate) cache_manager: Option<CacheManagerRef>,
170    pub(crate) regions: BTreeMap<RegionId, Option<MitoRegionRef>>,
171    /// Lingering time before deleting files.
172    pub(crate) opt: GcConfig,
173    /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
174    ///
175    /// Also contains manifest versions of regions when the tmp ref files are generated.
176    /// Used to determine whether the tmp ref files are outdated.
177    pub(crate) file_ref_manifest: FileRefsManifest,
178    _permit: OwnedSemaphorePermit,
179    /// Whether to perform full file listing during GC.
180    /// When set to false, GC will only delete files that are tracked in the manifest's removed_files,
181    /// which can significantly improve performance by avoiding expensive list operations.
182    /// When set to true, GC will perform a full listing to find and delete orphan files
183    /// (files not tracked in the manifest).
184    ///
185    /// Set to false for regular GC operations to optimize performance.
186    /// Set to true periodically or when you need to clean up orphan files.
187    pub full_file_listing: bool,
188}
189
190pub struct ManifestOpenConfig {
191    pub compress_manifest: bool,
192    pub manifest_checkpoint_distance: u64,
193    pub experimental_manifest_keep_removed_file_count: usize,
194    pub experimental_manifest_keep_removed_file_ttl: Duration,
195}
196
197impl From<MitoConfig> for ManifestOpenConfig {
198    fn from(mito_config: MitoConfig) -> Self {
199        Self {
200            compress_manifest: mito_config.compress_manifest,
201            manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
202            experimental_manifest_keep_removed_file_count: mito_config
203                .experimental_manifest_keep_removed_file_count,
204            experimental_manifest_keep_removed_file_ttl: mito_config
205                .experimental_manifest_keep_removed_file_ttl,
206        }
207    }
208}
209
210impl LocalGcWorker {
211    /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC.
212    /// The regions are specified by their `RegionId` and should all belong to the same table.
213    ///
214    #[allow(clippy::too_many_arguments)]
215    pub async fn try_new(
216        access_layer: AccessLayerRef,
217        cache_manager: Option<CacheManagerRef>,
218        regions_to_gc: BTreeMap<RegionId, Option<MitoRegionRef>>,
219        opt: GcConfig,
220        file_ref_manifest: FileRefsManifest,
221        limiter: &GcLimiterRef,
222        full_file_listing: bool,
223    ) -> Result<Self> {
224        if let Some(first_region_id) = regions_to_gc.keys().next() {
225            let table_id = first_region_id.table_id();
226            for region_id in regions_to_gc.keys() {
227                ensure!(
228                    region_id.table_id() == table_id,
229                    InvalidRequestSnafu {
230                        region_id: *region_id,
231                        reason: format!(
232                            "Region {} does not belong to table {}",
233                            region_id, table_id
234                        ),
235                    }
236                );
237            }
238        }
239
240        let permit = limiter.permit()?;
241
242        Ok(Self {
243            access_layer,
244            cache_manager,
245            regions: regions_to_gc,
246            opt,
247            file_ref_manifest,
248            _permit: permit,
249            full_file_listing,
250        })
251    }
252
253    /// Get tmp ref files for all current regions
254    pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileRef>>> {
255        let mut tmp_ref_files = HashMap::new();
256        for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
257            tmp_ref_files
258                .entry(*region_id)
259                .or_insert_with(HashSet::new)
260                .extend(file_refs.clone());
261            // no need to include manifest files here, as they are already included in region manifest
262        }
263
264        Ok(tmp_ref_files)
265    }
266
267    /// Run the GC worker in serial mode,
268    /// considering list files could be slow and run multiple regions in parallel
269    /// may cause too many concurrent listing operations.
270    ///
271    /// TODO(discord9): consider instead running in parallel mode
272    #[common_telemetry::tracing::instrument(
273        skip_all,
274        fields(region_count = self.regions.len(), full_file_listing = self.full_file_listing)
275    )]
276    pub async fn run(self) -> Result<GcReport> {
277        info!("LocalGcWorker started");
278        let _timer = GC_DURATION_SECONDS
279            .with_label_values(&["total"])
280            .start_timer();
281        let now = std::time::Instant::now();
282
283        let mut deleted_files = HashMap::new();
284        let mut deleted_indexes = HashMap::new();
285        let mut processed_regions = HashSet::new();
286        let tmp_ref_files = self.read_tmp_ref_files().await?;
287        for (region_id, region) in &self.regions {
288            let per_region_time = std::time::Instant::now();
289            if region.as_ref().map(|r| r.manifest_ctx.current_state())
290                == Some(RegionRoleState::Follower)
291            {
292                return UnexpectedSnafu {
293                    reason: format!(
294                        "Region {} is in Follower state, should not run GC on follower regions",
295                        region_id
296                    ),
297                }
298                .fail();
299            }
300            let tmp_ref_files = tmp_ref_files
301                .get(region_id)
302                .cloned()
303                .unwrap_or_else(HashSet::new);
304            let files = self
305                .do_region_gc(*region_id, region.clone(), &tmp_ref_files)
306                .await?;
307            let index_files = files
308                .iter()
309                .filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
310                .collect_vec();
311            let data_files = files
312                .into_iter()
313                .filter_map(|f| match f {
314                    RemovedFile::File(file_id, _) => Some(file_id),
315                    RemovedFile::Index(_, _) => None,
316                })
317                .collect();
318            deleted_files.insert(*region_id, data_files);
319            deleted_indexes.insert(*region_id, index_files);
320            processed_regions.insert(*region_id);
321            debug!(
322                "GC for region {} took {} secs.",
323                region_id,
324                per_region_time.elapsed().as_secs_f32()
325            );
326        }
327        info!(
328            "LocalGcWorker finished after {} secs.",
329            now.elapsed().as_secs_f32()
330        );
331        let report = GcReport {
332            deleted_files,
333            deleted_indexes,
334            need_retry_regions: HashSet::new(),
335            processed_regions,
336        };
337        Ok(report)
338    }
339}
340
341impl LocalGcWorker {
342    /// concurrency of listing files per region.
343    /// This is used to limit the number of concurrent listing operations and speed up listing
344    pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
345
346    /// Perform GC for the region.
347    /// 1. Get all the removed files in delta manifest files and their expel times
348    /// 2. List all files in the region dir concurrently
349    /// 3. Filter out the files that are still in use or may still be kept for a while
350    /// 4. Delete the unused files
351    ///
352    /// Note that the files that are still in use or may still be kept for a while are not deleted
353    /// to avoid deleting files that are still needed.
354    #[common_telemetry::tracing::instrument(
355        skip_all,
356        fields(
357            region_id = %region_id,
358            full_file_listing = self.full_file_listing,
359            region_present = region.is_some()
360        )
361    )]
362    pub async fn do_region_gc(
363        &self,
364        region_id: RegionId,
365        region: Option<MitoRegionRef>,
366        tmp_ref_files: &HashSet<FileRef>,
367    ) -> Result<Vec<RemovedFile>> {
368        let mode = if self.full_file_listing {
369            "full_listing"
370        } else {
371            "fast"
372        };
373        GC_RUNS_TOTAL.with_label_values(&[mode]).inc();
374        debug!(
375            "Doing gc for region {}, {}",
376            region_id,
377            if region.is_some() {
378                "region found"
379            } else {
380                "region not found, might be dropped"
381            }
382        );
383
384        ensure!(
385            region.is_some() || self.full_file_listing,
386            InvalidRequestSnafu {
387                region_id,
388                reason: "region not found and full_file_listing is false; refusing GC without full listing".to_string(),
389            }
390        );
391
392        let manifest = if let Some(region) = &region {
393            let manifest = region.manifest_ctx.manifest().await;
394            // If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use.
395            let file_ref_manifest_version = self
396                .file_ref_manifest
397                .manifest_version
398                .get(&region.region_id())
399                .cloned();
400            if file_ref_manifest_version != Some(manifest.manifest_version) {
401                // should be rare enough(few seconds after leader update manifest version), just skip gc for this region
402                warn!(
403                    "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
404                    file_ref_manifest_version,
405                    region.region_id(),
406                    manifest.manifest_version
407                );
408                GC_ERRORS_TOTAL
409                    .with_label_values(&["manifest_mismatch"])
410                    .inc();
411                return Ok(vec![]);
412            }
413            Some(manifest)
414        } else {
415            None
416        };
417
418        let all_entries = if let Some(manifest) = &manifest
419            && self.full_file_listing
420        {
421            // do the time consuming listing only when full_file_listing is true(and region is open)
422            // and do it first to make sure we have the latest manifest etc.
423            self.list_from_object_store(region_id, manifest.files.len())
424                .await?
425        } else if manifest.is_none() && self.full_file_listing {
426            // if region is already dropped, we have no manifest to refer to,
427            // so only do gc if `full_file_listing` is true, otherwise just skip it
428            // TODO(discord9): is doing one serial listing enough here?
429            self.list_from_object_store(region_id, Self::CONCURRENCY_LIST_PER_FILES)
430                .await?
431        } else {
432            vec![]
433        };
434
435        let recently_removed_files = if let Some(manifest) = &manifest {
436            self.get_removed_files_expel_times(manifest).await?
437        } else {
438            Default::default()
439        };
440
441        if recently_removed_files.is_empty() {
442            // no files to remove, skip
443            debug!("No recently removed files to gc for region {}", region_id);
444        }
445
446        let removed_file_cnt = recently_removed_files
447            .values()
448            .map(|s| s.len())
449            .sum::<usize>();
450
451        let current_files = manifest.as_ref().map(|m| &m.files);
452
453        let in_manifest = if let Some(current_files) = current_files {
454            current_files
455                .iter()
456                .map(|(file_id, meta)| (*file_id, meta.index_version()))
457                .collect::<HashMap<_, _>>()
458        } else {
459            Default::default()
460        };
461
462        let is_region_dropped = region.is_none();
463
464        let in_tmp_ref = tmp_ref_files
465            .iter()
466            .map(|file_ref| (file_ref.file_id, file_ref.index_version))
467            .collect::<HashSet<_>>();
468
469        let deletable_files = self
470            .list_to_be_deleted_files(
471                region_id,
472                is_region_dropped,
473                &in_manifest,
474                &in_tmp_ref,
475                recently_removed_files,
476                all_entries,
477            )
478            .await?;
479
480        let unused_file_cnt = deletable_files.len();
481
482        info!(
483            "gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete count: {}",
484            if region.is_none() {
485                "(region dropped)"
486            } else {
487                ""
488            },
489            current_files.map(|c| c.len()).unwrap_or(0),
490            tmp_ref_files.len(),
491            removed_file_cnt,
492            deletable_files.len(),
493        );
494        debug!(
495            "gc: deletable files for region {}: {:?}",
496            region_id, &deletable_files
497        );
498
499        debug!(
500            "Found {} unused index files to delete for region {}",
501            deletable_files.len(),
502            region_id
503        );
504
505        let _delete_timer = GC_DURATION_SECONDS
506            .with_label_values(&["delete_files"])
507            .start_timer();
508        self.delete_files(region_id, &deletable_files).await?;
509
510        debug!(
511            "Successfully deleted {} unused files for region {}",
512            unused_file_cnt, region_id
513        );
514        if let Some(region) = &region {
515            let _update_timer = GC_DURATION_SECONDS
516                .with_label_values(&["update_manifest"])
517                .start_timer();
518            self.update_manifest_removed_files(region, deletable_files.clone())
519                .await?;
520        }
521
522        Ok(deletable_files)
523    }
524
525    #[common_telemetry::tracing::instrument(
526        skip_all,
527        fields(region_id = %region_id, removed_file_count = removed_files.len())
528    )]
529    async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
530        let mut index_ids = vec![];
531        let file_pairs = removed_files
532            .iter()
533            .filter_map(|f| match f {
534                RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))),
535                RemovedFile::Index(file_id, index_version) => {
536                    let region_index_id =
537                        RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version);
538                    index_ids.push(region_index_id);
539                    None
540                }
541            })
542            .collect_vec();
543        delete_files(
544            region_id,
545            &file_pairs,
546            true,
547            &self.access_layer,
548            &self.cache_manager,
549        )
550        .await?;
551
552        if !file_pairs.is_empty() {
553            let deleted_count = file_pairs.len() as u64;
554            GC_FILES_DELETED_TOTAL
555                .with_label_values(&["parquet"])
556                .inc_by(deleted_count);
557            GC_DELETE_FILE_CNT.inc_by(deleted_count);
558        }
559
560        if !index_ids.is_empty() {
561            let deleted_count = index_ids.len() as u64;
562            delete_indexes(&index_ids, &self.access_layer, &self.cache_manager)
563                .await
564                .inspect_err(|_| {
565                    GC_ERRORS_TOTAL.with_label_values(&["delete_failed"]).inc();
566                })?;
567            GC_FILES_DELETED_TOTAL
568                .with_label_values(&["index"])
569                .inc_by(deleted_count);
570            GC_DELETE_FILE_CNT.inc_by(deleted_count);
571        }
572
573        Ok(())
574    }
575
576    /// Update region manifest for clear the actually deleted files
577    #[common_telemetry::tracing::instrument(
578        skip_all,
579        fields(region_id = %region.region_id(), deleted_file_count = deleted_files.len())
580    )]
581    async fn update_manifest_removed_files(
582        &self,
583        region: &MitoRegionRef,
584        deleted_files: Vec<RemovedFile>,
585    ) -> Result<()> {
586        let deleted_file_cnt = deleted_files.len();
587        debug!(
588            "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
589            region.region_id()
590        );
591
592        let mut manager = region.manifest_ctx.manifest_manager.write().await;
593        let cnt = deleted_files.len();
594        manager.clear_deleted_files(deleted_files);
595        debug!(
596            "Updated region_id={} region manifest to clear {cnt} deleted files",
597            region.region_id(),
598        );
599
600        Ok(())
601    }
602
603    /// Get all the removed files in delta manifest files and their expel times.
604    /// The expel time is the time when the file is considered as removed.
605    /// Which is the last modified time of delta manifest which contains the remove action.
606    ///
607    pub async fn get_removed_files_expel_times(
608        &self,
609        region_manifest: &Arc<RegionManifest>,
610    ) -> Result<BTreeMap<Timestamp, HashSet<RemovedFile>>> {
611        let mut ret = BTreeMap::new();
612        for files in &region_manifest.removed_files.removed_files {
613            let expel_time = Timestamp::new_millisecond(files.removed_at);
614            let set = ret.entry(expel_time).or_insert_with(HashSet::new);
615            set.extend(files.files.iter().cloned());
616        }
617
618        Ok(ret)
619    }
620
621    /// Create partitioned listers for concurrent file listing based on concurrency level.
622    /// Returns a vector of (lister, end_boundary) pairs for parallel processing.
623    async fn partition_region_files(
624        &self,
625        region_id: RegionId,
626        concurrency: usize,
627    ) -> Result<Vec<(Lister, Option<String>)>> {
628        let region_dir = self.access_layer.build_region_dir(region_id);
629
630        let partitions = gen_partition_from_concurrency(concurrency);
631        let bounds = vec![None]
632            .into_iter()
633            .chain(partitions.iter().map(|p| Some(p.clone())))
634            .chain(vec![None])
635            .collect::<Vec<_>>();
636
637        let mut listers = vec![];
638        for part in bounds.windows(2) {
639            let start = part[0].clone();
640            let end = part[1].clone();
641            let mut lister = self.access_layer.object_store().lister_with(&region_dir);
642            if let Some(s) = start {
643                lister = lister.start_after(&s);
644            }
645
646            let lister = lister.await.context(OpenDalSnafu)?;
647            listers.push((lister, end));
648        }
649
650        Ok(listers)
651    }
652
653    /// List all files in the region directory.
654    /// Returns a vector of all file entries found.
655    /// This might take a long time if there are many files in the region directory.
656    #[common_telemetry::tracing::instrument(
657        skip_all,
658        fields(region_id = %region_id, file_cnt_hint = file_cnt_hint)
659    )]
660    async fn list_from_object_store(
661        &self,
662        region_id: RegionId,
663        file_cnt_hint: usize,
664    ) -> Result<Vec<Entry>> {
665        let _timer = GC_DURATION_SECONDS
666            .with_label_values(&["list_files"])
667            .start_timer();
668        let start = tokio::time::Instant::now();
669        let concurrency = (file_cnt_hint / Self::CONCURRENCY_LIST_PER_FILES)
670            .max(1)
671            .min(self.opt.max_concurrent_lister_per_gc_job);
672
673        let listers = self
674            .partition_region_files(region_id, concurrency)
675            .await
676            .inspect_err(|_| {
677                GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
678            })?;
679        let lister_cnt = listers.len();
680
681        // Step 2: Concurrently list all parquet files in the region root directory
682        let mut all_entries = self
683            .list_region_files_concurrent(listers)
684            .await
685            .inspect_err(|_| {
686                GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
687            })?;
688        let root_cnt = all_entries.len();
689
690        // Step 2b: Flat-list region_dir/index/ for puffin files.
691        // This is NOT a recursive listing — we only list the index/
692        // subdirectory to avoid scanning nested dirs/staging/blob/cache.
693        let index_entries = self
694            .list_region_index_files(region_id)
695            .await
696            .inspect_err(|_| {
697                GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
698            })?;
699        let index_cnt = index_entries.len();
700        all_entries.extend(index_entries);
701        info!(
702            "gc: full listing mode cost {} secs using {lister_cnt} lister for root={root_cnt} index={index_cnt} files in region {}.",
703            start.elapsed().as_secs_f64(),
704            region_id
705        );
706        Ok(all_entries)
707    }
708
709    /// Flat-list puffin files from `region_dir/index/`.
710    /// If the index directory does not exist, returns an empty vec without error.
711    /// Only `.puffin` files (not subdirectories) are included.
712    async fn list_region_index_files(&self, region_id: RegionId) -> Result<Vec<Entry>> {
713        let region_dir = self.access_layer.build_region_dir(region_id);
714        let index_dir = object_store::util::join_dir(&region_dir, "index");
715
716        let mut lister = match self
717            .access_layer
718            .object_store()
719            .lister_with(&index_dir)
720            .await
721        {
722            Ok(l) => l,
723            Err(e) if e.kind() == ErrorKind::NotFound => {
724                // Index dir may not exist — that's fine, just log and return empty.
725                // object-store backends (especially filesystem) may error on
726                // non-existent directories.
727                debug!(
728                    "Index directory not found for region {}: {}. Treating as empty.",
729                    region_id, e
730                );
731                return Ok(vec![]);
732            }
733            Err(e) => return Err(e).context(OpenDalSnafu),
734        };
735
736        let mut entries = Vec::new();
737        while let Some(entry) = lister.next().await {
738            let entry = entry.context(OpenDalSnafu)?;
739            if entry.metadata().is_file() && entry.name().ends_with(".puffin") {
740                entries.push(entry);
741            }
742        }
743
744        Ok(entries)
745    }
746
747    /// Concurrently list all files in the region directory using the provided listers.
748    /// Returns a vector of all file entries found across all partitions.
749    async fn list_region_files_concurrent(
750        &self,
751        listers: Vec<(Lister, Option<String>)>,
752    ) -> Result<Vec<Entry>> {
753        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
754        let mut handles = vec![];
755
756        for (lister, end) in listers {
757            let tx = tx.clone();
758            let handle = tokio::spawn(
759                async move {
760                    let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
761                        Ok(e) => {
762                            if let Some(end) = &end {
763                                // reach end, stop listing
764                                e.name() < end.as_str()
765                            } else {
766                                // no end, take all entries
767                                true
768                            }
769                        }
770                        // Entry went wrong. Keep listing so the error can be propagated below
771                        // instead of returning a partial listing as success.
772                        Err(err) => {
773                            warn!("Failed to list entry: {}", err);
774                            true
775                        }
776                    });
777                    let stream = stream
778                        .filter(|e| {
779                            if let Ok(e) = &e {
780                                // notice that we only care about files, skip dirs
781                                e.metadata().is_file()
782                            } else {
783                                // error entry, take for further logging
784                                true
785                            }
786                        })
787                        .collect::<Vec<_>>()
788                        .await;
789                    // ordering of files doesn't matter here, so we can send them directly
790                    tx.send(stream).await.expect("Failed to send entries");
791                }
792                .instrument(common_telemetry::tracing::info_span!("gc_list_partition")),
793            );
794
795            handles.push(handle);
796        }
797
798        // Wait for all listers to finish
799        for handle in handles {
800            handle.await.context(JoinSnafu)?;
801        }
802
803        drop(tx); // Close the channel to stop receiving
804
805        // Collect all entries from the channel
806        let mut all_entries = vec![];
807        while let Some(stream) = rx.recv().await {
808            for entry in stream {
809                all_entries.push(entry.context(OpenDalSnafu)?);
810            }
811        }
812
813        Ok(all_entries)
814    }
815
816    #[allow(clippy::too_many_arguments)]
817    fn filter_deletable_files(
818        &self,
819        is_region_dropped: bool,
820        entries: Vec<Entry>,
821        in_manifest: &HashMap<FileId, Option<IndexVersion>>,
822        in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
823        may_linger_files: &HashSet<&RemovedFile>,
824        eligible_for_delete: &HashSet<&RemovedFile>,
825        unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
826    ) -> Vec<RemovedFile> {
827        let mut ready_for_delete = vec![];
828        // all group by file id for easier checking
829        let in_tmp_ref: HashMap<FileId, HashSet<IndexVersion>> =
830            in_tmp_ref
831                .iter()
832                .fold(HashMap::new(), |mut acc, (file, version)| {
833                    let indices = acc.entry(*file).or_default();
834                    if let Some(version) = version {
835                        indices.insert(*version);
836                    }
837                    acc
838                });
839
840        let may_linger_files: HashMap<FileId, HashSet<&RemovedFile>> = may_linger_files
841            .iter()
842            .fold(HashMap::new(), |mut acc, file| {
843                let indices = acc.entry(file.file_id()).or_default();
844                indices.insert(file);
845                acc
846            });
847
848        let eligible_for_delete: HashMap<FileId, HashSet<&RemovedFile>> = eligible_for_delete
849            .iter()
850            .fold(HashMap::new(), |mut acc, file| {
851                let indices = acc.entry(file.file_id()).or_default();
852                indices.insert(file);
853                acc
854            });
855
856        for entry in entries {
857            let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) {
858                Ok((file_id, file_type)) => (file_id, file_type),
859                Err(err) => {
860                    error!(err; "Failed to parse file id from path: {}", entry.name());
861                    // if we can't parse the file id, it means it's not a sst or index file
862                    // shouldn't delete it because we don't know what it is
863                    GC_SKIPPED_UNPARSABLE_FILES.inc();
864                    continue;
865                }
866            };
867
868            let should_delete = match file_type {
869                FileType::Parquet => {
870                    let is_in_manifest = in_manifest.contains_key(&file_id);
871                    let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id);
872                    let is_linger = may_linger_files.contains_key(&file_id);
873                    let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id);
874
875                    should_delete_file(
876                        is_in_manifest,
877                        is_in_tmp_ref,
878                        is_linger,
879                        is_eligible_for_delete,
880                        is_region_dropped,
881                        &entry,
882                        unknown_file_may_linger_until,
883                    )
884                }
885                FileType::Puffin(version) => {
886                    // notice need to check both file id and version
887                    let is_in_manifest = in_manifest
888                        .get(&file_id)
889                        .map(|opt_ver| *opt_ver == Some(version))
890                        .unwrap_or(false);
891                    let is_in_tmp_ref = in_tmp_ref
892                        .get(&file_id)
893                        .map(|versions| versions.contains(&version))
894                        .unwrap_or(false);
895                    let is_linger = may_linger_files
896                        .get(&file_id)
897                        .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
898                        .unwrap_or(false);
899                    let is_eligible_for_delete = eligible_for_delete
900                        .get(&file_id)
901                        .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
902                        .unwrap_or(false);
903
904                    should_delete_file(
905                        is_in_manifest,
906                        is_in_tmp_ref,
907                        is_linger,
908                        is_eligible_for_delete,
909                        is_region_dropped,
910                        &entry,
911                        unknown_file_may_linger_until,
912                    )
913                }
914            };
915
916            if should_delete {
917                let removed_file = match file_type {
918                    FileType::Parquet => {
919                        // notice this cause we don't track index version for parquet files
920                        // since entries comes from listing, we can't get index version from path
921                        RemovedFile::File(file_id, None)
922                    }
923                    FileType::Puffin(version) => {
924                        GC_ORPHANED_INDEX_FILES.inc();
925                        RemovedFile::Index(file_id, version)
926                    }
927                };
928                ready_for_delete.push(removed_file);
929            }
930        }
931        ready_for_delete
932    }
933
934    /// List files to be deleted based on their presence in the manifest, temporary references, and recently removed files.
935    /// Returns a vector of `RemovedFile` that are eligible for deletion.
936    ///
937    /// When `full_file_listing` is false, this method will only delete (subset of) files tracked in
938    /// `recently_removed_files`, which significantly
939    /// improves performance. When `full_file_listing` is true, it read from `all_entries` to find
940    /// and delete orphan files (files not tracked in the manifest).
941    ///
942    pub async fn list_to_be_deleted_files(
943        &self,
944        region_id: RegionId,
945        is_region_dropped: bool,
946        in_manifest: &HashMap<FileId, Option<IndexVersion>>,
947        in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
948        recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
949        all_entries: Vec<Entry>,
950    ) -> Result<Vec<RemovedFile>> {
951        let now = chrono::Utc::now();
952        let may_linger_until = self
953            .opt
954            .lingering_time
955            .map(|lingering_time| {
956                chrono::Duration::from_std(lingering_time)
957                    .with_context(|_| DurationOutOfRangeSnafu {
958                        input: lingering_time,
959                    })
960                    .map(|t| now - t)
961            })
962            .transpose()?;
963
964        let unknown_file_may_linger_until = now
965            - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
966                |_| DurationOutOfRangeSnafu {
967                    input: self.opt.unknown_file_lingering_time,
968                },
969            )?;
970
971        // files that may linger, which means they are not in use but may still be kept for a while
972        let threshold =
973            may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
974        // TODO(discord9): if region is already closed, maybe handle threshold differently?
975        // is consider all files to be recently removed acceptable?
976        let mut recently_removed_files = recently_removed_files;
977        let may_linger_files = match threshold {
978            Some(threshold) => recently_removed_files.split_off(&threshold),
979            None => BTreeMap::new(),
980        };
981        debug!("may_linger_files: {:?}", may_linger_files);
982
983        let all_may_linger_files = may_linger_files.values().flatten().collect::<HashSet<_>>();
984
985        // known files(tracked in removed files field) that are eligible for removal
986        // (passed lingering time)
987        let eligible_for_removal = recently_removed_files
988            .values()
989            .flatten()
990            .collect::<HashSet<_>>();
991
992        // When full_file_listing is false, skip expensive list operations and only delete
993        // files that are tracked in recently_removed_files
994        if !self.full_file_listing {
995            // Only delete files that:
996            // 1. Are in recently_removed_files (tracked in manifest)
997            // 2. Are not in use(in manifest or tmp ref)
998            // 3. Have passed the lingering time
999            let files_to_delete: Vec<RemovedFile> = eligible_for_removal
1000                .iter()
1001                .filter(|file_id| {
1002                    let in_use = match file_id {
1003                        RemovedFile::File(file_id, index_version) => {
1004                            in_manifest.get(file_id) == Some(index_version)
1005                                || in_tmp_ref.contains(&(*file_id, *index_version))
1006                        }
1007                        RemovedFile::Index(file_id, index_version) => {
1008                            in_manifest.get(file_id) == Some(&Some(*index_version))
1009                                || in_tmp_ref.contains(&(*file_id, Some(*index_version)))
1010                        }
1011                    };
1012                    !in_use
1013                })
1014                .map(|&f| f.clone())
1015                .collect();
1016
1017            info!(
1018                "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
1019                region_id,
1020                files_to_delete.len()
1021            );
1022
1023            return Ok(files_to_delete);
1024        }
1025
1026        // Full file listing mode: get the full list of files from object store
1027
1028        // Step 3: Filter files to determine which ones can be deleted
1029        let all_unused_files_ready_for_delete = self.filter_deletable_files(
1030            is_region_dropped,
1031            all_entries,
1032            in_manifest,
1033            in_tmp_ref,
1034            &all_may_linger_files,
1035            &eligible_for_removal,
1036            unknown_file_may_linger_until,
1037        );
1038
1039        Ok(all_unused_files_ready_for_delete)
1040    }
1041}
1042
1043/// Generate partition prefixes based on concurrency and
1044/// assume file names are evenly-distributed uuid string,
1045/// to evenly distribute files across partitions.
1046/// For example, if concurrency is 2, partition prefixes will be:
1047/// ["8"] so it divide uuids into two partitions based on the first character.
1048/// If concurrency is 32, partition prefixes will be:
1049/// ["08", "10", "18", "20", "28", "30", "38" ..., "f0", "f8"]
1050/// if concurrency is 1, it returns an empty vector.
1051///
1052fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
1053    let n = concurrency.next_power_of_two();
1054    if n <= 1 {
1055        return vec![];
1056    }
1057
1058    // `d` is the number of hex characters required to build the partition key.
1059    // `p` is the total number of possible values for a key of length `d`.
1060    // We need to find the smallest `d` such that 16^d >= n.
1061    let mut d = 0;
1062    let mut p: u128 = 1;
1063    while p < n as u128 {
1064        p *= 16;
1065        d += 1;
1066    }
1067
1068    let total_space = p;
1069    let step = total_space / n as u128;
1070
1071    (1..n)
1072        .map(|i| {
1073            let boundary = i as u128 * step;
1074            format!("{:0width$x}", boundary, width = d)
1075        })
1076        .collect()
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081    use super::*;
1082
1083    #[test]
1084    fn test_gen_partition_from_concurrency() {
1085        let partitions = gen_partition_from_concurrency(1);
1086        assert!(partitions.is_empty());
1087
1088        let partitions = gen_partition_from_concurrency(2);
1089        assert_eq!(partitions, vec!["8"]);
1090
1091        let partitions = gen_partition_from_concurrency(3);
1092        assert_eq!(partitions, vec!["4", "8", "c"]);
1093
1094        let partitions = gen_partition_from_concurrency(4);
1095        assert_eq!(partitions, vec!["4", "8", "c"]);
1096
1097        let partitions = gen_partition_from_concurrency(8);
1098        assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
1099
1100        let partitions = gen_partition_from_concurrency(16);
1101        assert_eq!(
1102            partitions,
1103            vec![
1104                "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
1105            ]
1106        );
1107
1108        let partitions = gen_partition_from_concurrency(32);
1109        assert_eq!(
1110            partitions,
1111            [
1112                "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
1113                "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
1114                "e8", "f0", "f8",
1115            ]
1116        );
1117    }
1118}