mito2/
gc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! GC worker which periodically checks and removes unused/obsolete  SST files.
16//!
17//! `expel time`: the time when the file is considered as removed, as in removed from the manifest.
18//! `lingering time`: the time duration before deleting files after they are removed from manifest.
19//! `delta manifest`: the manifest files after the last checkpoint that contains the changes to the manifest.
20//! `delete time`: the time when the file is actually deleted from the object store.
21//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint.
22//!
23
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::tracing::Instrument as _;
30use common_telemetry::{debug, error, info, warn};
31use common_time::Timestamp;
32use itertools::Itertools;
33use object_store::{Entry, Lister};
34use serde::{Deserialize, Serialize};
35use snafu::{ResultExt as _, ensure};
36use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
37use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
38use tokio_stream::StreamExt;
39
40use crate::access_layer::AccessLayerRef;
41use crate::cache::CacheManagerRef;
42use crate::cache::file_cache::FileType;
43use crate::config::MitoConfig;
44use crate::error::{
45    DurationOutOfRangeSnafu, InvalidRequestSnafu, JoinSnafu, OpenDalSnafu, Result,
46    TooManyGcJobsSnafu, UnexpectedSnafu,
47};
48use crate::manifest::action::{RegionManifest, RemovedFile};
49use crate::metrics::{
50    GC_DELETE_FILE_CNT, GC_DURATION_SECONDS, GC_ERRORS_TOTAL, GC_FILES_DELETED_TOTAL,
51    GC_ORPHANED_INDEX_FILES, GC_RUNS_TOTAL, GC_SKIPPED_UNPARSABLE_FILES,
52};
53use crate::region::{MitoRegionRef, RegionRoleState};
54use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_indexes};
55use crate::sst::location::{self};
56
57#[cfg(test)]
58mod worker_test;
59
60/// Helper function to determine if a file should be deleted based on common logic
61/// shared between Parquet and Puffin file types.
62fn should_delete_file(
63    is_in_manifest: bool,
64    is_in_tmp_ref: bool,
65    is_linger: bool,
66    is_eligible_for_delete: bool,
67    is_region_dropped: bool,
68    _entry: &Entry,
69    _unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
70) -> bool {
71    let is_known = is_linger || is_eligible_for_delete;
72
73    !is_in_manifest
74        && !is_in_tmp_ref
75        && if is_known {
76            is_eligible_for_delete
77        } else {
78            !is_in_tmp_ref && is_region_dropped
79        }
80}
81
82/// Limit the amount of concurrent GC jobs on the datanode
83pub struct GcLimiter {
84    pub gc_job_limit: Arc<tokio::sync::Semaphore>,
85    gc_concurrency: usize,
86}
87
88pub type GcLimiterRef = Arc<GcLimiter>;
89
90impl GcLimiter {
91    pub fn new(gc_concurrency: usize) -> Self {
92        Self {
93            gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
94            gc_concurrency,
95        }
96    }
97
98    pub fn running_gc_tasks(&self) -> u32 {
99        (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
100    }
101
102    pub fn gc_concurrency(&self) -> u32 {
103        self.gc_concurrency as u32
104    }
105
106    pub fn gc_stat(&self) -> GcStat {
107        GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
108    }
109
110    /// Try to acquire a permit for a GC job.
111    ///
112    /// If no permit is available, returns an `TooManyGcJobs` error.
113    pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
114        self.gc_job_limit
115            .clone()
116            .try_acquire_owned()
117            .map_err(|e| match e {
118                TryAcquireError::Closed => UnexpectedSnafu {
119                    reason: format!("Failed to acquire gc permit: {e}"),
120                }
121                .build(),
122                TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
123            })
124    }
125}
126
127#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
128#[serde(default)]
129pub struct GcConfig {
130    /// Whether GC is enabled.
131    pub enable: bool,
132    /// Lingering time before deleting files.
133    /// Should be long enough to allow long running queries to finish.
134    /// If set to None, then unused files will be deleted immediately.
135    ///
136    #[serde(with = "humantime_serde")]
137    pub lingering_time: Option<Duration>,
138    /// Lingering time before deleting unknown files(files with undetermine expel time).
139    /// expel time is the time when the file is considered as removed, as in removed from the manifest.
140    /// This should only occur rarely, as manifest keep tracks in `removed_files` field
141    /// unless something goes wrong.
142    #[serde(with = "humantime_serde")]
143    pub unknown_file_lingering_time: Duration,
144    /// Maximum concurrent list operations per GC job.
145    /// This is used to limit the number of concurrent listing operations and speed up listing.
146    pub max_concurrent_lister_per_gc_job: usize,
147    /// Maximum concurrent GC jobs.
148    /// This is used to limit the number of concurrent GC jobs running on the datanode
149    /// to prevent too many concurrent GC jobs from overwhelming the datanode.
150    pub max_concurrent_gc_job: usize,
151}
152
153impl Default for GcConfig {
154    fn default() -> Self {
155        Self {
156            enable: false,
157            // expect long running queries to be finished(or at least be able to notify it's using a deleted file) within a reasonable time
158            lingering_time: Some(Duration::from_secs(60)),
159            // 1 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
160            unknown_file_lingering_time: Duration::from_secs(60 * 60),
161            max_concurrent_lister_per_gc_job: 32,
162            max_concurrent_gc_job: 4,
163        }
164    }
165}
166
167pub struct LocalGcWorker {
168    pub(crate) access_layer: AccessLayerRef,
169    pub(crate) cache_manager: Option<CacheManagerRef>,
170    pub(crate) regions: BTreeMap<RegionId, Option<MitoRegionRef>>,
171    /// Lingering time before deleting files.
172    pub(crate) opt: GcConfig,
173    /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
174    ///
175    /// Also contains manifest versions of regions when the tmp ref files are generated.
176    /// Used to determine whether the tmp ref files are outdated.
177    pub(crate) file_ref_manifest: FileRefsManifest,
178    _permit: OwnedSemaphorePermit,
179    /// Whether to perform full file listing during GC.
180    /// When set to false, GC will only delete files that are tracked in the manifest's removed_files,
181    /// which can significantly improve performance by avoiding expensive list operations.
182    /// When set to true, GC will perform a full listing to find and delete orphan files
183    /// (files not tracked in the manifest).
184    ///
185    /// Set to false for regular GC operations to optimize performance.
186    /// Set to true periodically or when you need to clean up orphan files.
187    pub full_file_listing: bool,
188}
189
190pub struct ManifestOpenConfig {
191    pub compress_manifest: bool,
192    pub manifest_checkpoint_distance: u64,
193    pub experimental_manifest_keep_removed_file_count: usize,
194    pub experimental_manifest_keep_removed_file_ttl: Duration,
195}
196
197impl From<MitoConfig> for ManifestOpenConfig {
198    fn from(mito_config: MitoConfig) -> Self {
199        Self {
200            compress_manifest: mito_config.compress_manifest,
201            manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
202            experimental_manifest_keep_removed_file_count: mito_config
203                .experimental_manifest_keep_removed_file_count,
204            experimental_manifest_keep_removed_file_ttl: mito_config
205                .experimental_manifest_keep_removed_file_ttl,
206        }
207    }
208}
209
210impl LocalGcWorker {
211    /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC.
212    /// The regions are specified by their `RegionId` and should all belong to the same table.
213    ///
214    #[allow(clippy::too_many_arguments)]
215    pub async fn try_new(
216        access_layer: AccessLayerRef,
217        cache_manager: Option<CacheManagerRef>,
218        regions_to_gc: BTreeMap<RegionId, Option<MitoRegionRef>>,
219        opt: GcConfig,
220        file_ref_manifest: FileRefsManifest,
221        limiter: &GcLimiterRef,
222        full_file_listing: bool,
223    ) -> Result<Self> {
224        if let Some(first_region_id) = regions_to_gc.keys().next() {
225            let table_id = first_region_id.table_id();
226            for region_id in regions_to_gc.keys() {
227                ensure!(
228                    region_id.table_id() == table_id,
229                    InvalidRequestSnafu {
230                        region_id: *region_id,
231                        reason: format!(
232                            "Region {} does not belong to table {}",
233                            region_id, table_id
234                        ),
235                    }
236                );
237            }
238        }
239
240        let permit = limiter.permit()?;
241
242        Ok(Self {
243            access_layer,
244            cache_manager,
245            regions: regions_to_gc,
246            opt,
247            file_ref_manifest,
248            _permit: permit,
249            full_file_listing,
250        })
251    }
252
253    /// Get tmp ref files for all current regions
254    pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileRef>>> {
255        let mut tmp_ref_files = HashMap::new();
256        for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
257            tmp_ref_files
258                .entry(*region_id)
259                .or_insert_with(HashSet::new)
260                .extend(file_refs.clone());
261            // no need to include manifest files here, as they are already included in region manifest
262        }
263
264        Ok(tmp_ref_files)
265    }
266
267    /// Run the GC worker in serial mode,
268    /// considering list files could be slow and run multiple regions in parallel
269    /// may cause too many concurrent listing operations.
270    ///
271    /// TODO(discord9): consider instead running in parallel mode
272    #[common_telemetry::tracing::instrument(
273        skip_all,
274        fields(region_count = self.regions.len(), full_file_listing = self.full_file_listing)
275    )]
276    pub async fn run(self) -> Result<GcReport> {
277        info!("LocalGcWorker started");
278        let _timer = GC_DURATION_SECONDS
279            .with_label_values(&["total"])
280            .start_timer();
281        let now = std::time::Instant::now();
282
283        let mut deleted_files = HashMap::new();
284        let mut deleted_indexes = HashMap::new();
285        let mut processed_regions = HashSet::new();
286        let tmp_ref_files = self.read_tmp_ref_files().await?;
287        for (region_id, region) in &self.regions {
288            let per_region_time = std::time::Instant::now();
289            if region.as_ref().map(|r| r.manifest_ctx.current_state())
290                == Some(RegionRoleState::Follower)
291            {
292                return UnexpectedSnafu {
293                    reason: format!(
294                        "Region {} is in Follower state, should not run GC on follower regions",
295                        region_id
296                    ),
297                }
298                .fail();
299            }
300            let tmp_ref_files = tmp_ref_files
301                .get(region_id)
302                .cloned()
303                .unwrap_or_else(HashSet::new);
304            let files = self
305                .do_region_gc(*region_id, region.clone(), &tmp_ref_files)
306                .await?;
307            let index_files = files
308                .iter()
309                .filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
310                .collect_vec();
311            deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
312            deleted_indexes.insert(*region_id, index_files);
313            processed_regions.insert(*region_id);
314            debug!(
315                "GC for region {} took {} secs.",
316                region_id,
317                per_region_time.elapsed().as_secs_f32()
318            );
319        }
320        info!(
321            "LocalGcWorker finished after {} secs.",
322            now.elapsed().as_secs_f32()
323        );
324        let report = GcReport {
325            deleted_files,
326            deleted_indexes,
327            need_retry_regions: HashSet::new(),
328            processed_regions,
329        };
330        Ok(report)
331    }
332}
333
334impl LocalGcWorker {
335    /// concurrency of listing files per region.
336    /// This is used to limit the number of concurrent listing operations and speed up listing
337    pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
338
339    /// Perform GC for the region.
340    /// 1. Get all the removed files in delta manifest files and their expel times
341    /// 2. List all files in the region dir concurrently
342    /// 3. Filter out the files that are still in use or may still be kept for a while
343    /// 4. Delete the unused files
344    ///
345    /// Note that the files that are still in use or may still be kept for a while are not deleted
346    /// to avoid deleting files that are still needed.
347    #[common_telemetry::tracing::instrument(
348        skip_all,
349        fields(
350            region_id = %region_id,
351            full_file_listing = self.full_file_listing,
352            region_present = region.is_some()
353        )
354    )]
355    pub async fn do_region_gc(
356        &self,
357        region_id: RegionId,
358        region: Option<MitoRegionRef>,
359        tmp_ref_files: &HashSet<FileRef>,
360    ) -> Result<Vec<RemovedFile>> {
361        let mode = if self.full_file_listing {
362            "full_listing"
363        } else {
364            "fast"
365        };
366        GC_RUNS_TOTAL.with_label_values(&[mode]).inc();
367        debug!(
368            "Doing gc for region {}, {}",
369            region_id,
370            if region.is_some() {
371                "region found"
372            } else {
373                "region not found, might be dropped"
374            }
375        );
376
377        ensure!(
378            region.is_some() || self.full_file_listing,
379            InvalidRequestSnafu {
380                region_id,
381                reason: "region not found and full_file_listing is false; refusing GC without full listing".to_string(),
382            }
383        );
384
385        let manifest = if let Some(region) = &region {
386            let manifest = region.manifest_ctx.manifest().await;
387            // If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use.
388            let file_ref_manifest_version = self
389                .file_ref_manifest
390                .manifest_version
391                .get(&region.region_id())
392                .cloned();
393            if file_ref_manifest_version != Some(manifest.manifest_version) {
394                // should be rare enough(few seconds after leader update manifest version), just skip gc for this region
395                warn!(
396                    "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
397                    file_ref_manifest_version,
398                    region.region_id(),
399                    manifest.manifest_version
400                );
401                GC_ERRORS_TOTAL
402                    .with_label_values(&["manifest_mismatch"])
403                    .inc();
404                return Ok(vec![]);
405            }
406            Some(manifest)
407        } else {
408            None
409        };
410
411        let all_entries = if let Some(manifest) = &manifest
412            && self.full_file_listing
413        {
414            // do the time consuming listing only when full_file_listing is true(and region is open)
415            // and do it first to make sure we have the latest manifest etc.
416            self.list_from_object_store(region_id, manifest.files.len())
417                .await?
418        } else if manifest.is_none() && self.full_file_listing {
419            // if region is already dropped, we have no manifest to refer to,
420            // so only do gc if `full_file_listing` is true, otherwise just skip it
421            // TODO(discord9): is doing one serial listing enough here?
422            self.list_from_object_store(region_id, Self::CONCURRENCY_LIST_PER_FILES)
423                .await?
424        } else {
425            vec![]
426        };
427
428        let recently_removed_files = if let Some(manifest) = &manifest {
429            self.get_removed_files_expel_times(manifest).await?
430        } else {
431            Default::default()
432        };
433
434        if recently_removed_files.is_empty() {
435            // no files to remove, skip
436            debug!("No recently removed files to gc for region {}", region_id);
437        }
438
439        let removed_file_cnt = recently_removed_files
440            .values()
441            .map(|s| s.len())
442            .sum::<usize>();
443
444        let current_files = manifest.as_ref().map(|m| &m.files);
445
446        let in_manifest = if let Some(current_files) = current_files {
447            current_files
448                .iter()
449                .map(|(file_id, meta)| (*file_id, meta.index_version()))
450                .collect::<HashMap<_, _>>()
451        } else {
452            Default::default()
453        };
454
455        let is_region_dropped = region.is_none();
456
457        let in_tmp_ref = tmp_ref_files
458            .iter()
459            .map(|file_ref| (file_ref.file_id, file_ref.index_version))
460            .collect::<HashSet<_>>();
461
462        let deletable_files = self
463            .list_to_be_deleted_files(
464                region_id,
465                is_region_dropped,
466                &in_manifest,
467                &in_tmp_ref,
468                recently_removed_files,
469                all_entries,
470            )
471            .await?;
472
473        let unused_file_cnt = deletable_files.len();
474
475        info!(
476            "gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete count: {}",
477            if region.is_none() {
478                "(region dropped)"
479            } else {
480                ""
481            },
482            current_files.map(|c| c.len()).unwrap_or(0),
483            tmp_ref_files.len(),
484            removed_file_cnt,
485            deletable_files.len(),
486        );
487        debug!(
488            "gc: deletable files for region {}: {:?}",
489            region_id, &deletable_files
490        );
491
492        debug!(
493            "Found {} unused index files to delete for region {}",
494            deletable_files.len(),
495            region_id
496        );
497
498        let _delete_timer = GC_DURATION_SECONDS
499            .with_label_values(&["delete_files"])
500            .start_timer();
501        self.delete_files(region_id, &deletable_files).await?;
502
503        debug!(
504            "Successfully deleted {} unused files for region {}",
505            unused_file_cnt, region_id
506        );
507        if let Some(region) = &region {
508            let _update_timer = GC_DURATION_SECONDS
509                .with_label_values(&["update_manifest"])
510                .start_timer();
511            self.update_manifest_removed_files(region, deletable_files.clone())
512                .await?;
513        }
514
515        Ok(deletable_files)
516    }
517
518    #[common_telemetry::tracing::instrument(
519        skip_all,
520        fields(region_id = %region_id, removed_file_count = removed_files.len())
521    )]
522    async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
523        let mut index_ids = vec![];
524        let file_pairs = removed_files
525            .iter()
526            .filter_map(|f| match f {
527                RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))),
528                RemovedFile::Index(file_id, index_version) => {
529                    let region_index_id =
530                        RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version);
531                    index_ids.push(region_index_id);
532                    None
533                }
534            })
535            .collect_vec();
536        delete_files(
537            region_id,
538            &file_pairs,
539            true,
540            &self.access_layer,
541            &self.cache_manager,
542        )
543        .await?;
544
545        if !file_pairs.is_empty() {
546            let deleted_count = file_pairs.len() as u64;
547            GC_FILES_DELETED_TOTAL
548                .with_label_values(&["parquet"])
549                .inc_by(deleted_count);
550            GC_DELETE_FILE_CNT.inc_by(deleted_count);
551        }
552
553        if !index_ids.is_empty() {
554            let deleted_count = index_ids.len() as u64;
555            delete_indexes(&index_ids, &self.access_layer, &self.cache_manager)
556                .await
557                .inspect_err(|_| {
558                    GC_ERRORS_TOTAL.with_label_values(&["delete_failed"]).inc();
559                })?;
560            GC_FILES_DELETED_TOTAL
561                .with_label_values(&["index"])
562                .inc_by(deleted_count);
563            GC_DELETE_FILE_CNT.inc_by(deleted_count);
564        }
565
566        Ok(())
567    }
568
569    /// Update region manifest for clear the actually deleted files
570    #[common_telemetry::tracing::instrument(
571        skip_all,
572        fields(region_id = %region.region_id(), deleted_file_count = deleted_files.len())
573    )]
574    async fn update_manifest_removed_files(
575        &self,
576        region: &MitoRegionRef,
577        deleted_files: Vec<RemovedFile>,
578    ) -> Result<()> {
579        let deleted_file_cnt = deleted_files.len();
580        debug!(
581            "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
582            region.region_id()
583        );
584
585        let mut manager = region.manifest_ctx.manifest_manager.write().await;
586        let cnt = deleted_files.len();
587        manager.clear_deleted_files(deleted_files);
588        debug!(
589            "Updated region_id={} region manifest to clear {cnt} deleted files",
590            region.region_id(),
591        );
592
593        Ok(())
594    }
595
596    /// Get all the removed files in delta manifest files and their expel times.
597    /// The expel time is the time when the file is considered as removed.
598    /// Which is the last modified time of delta manifest which contains the remove action.
599    ///
600    pub async fn get_removed_files_expel_times(
601        &self,
602        region_manifest: &Arc<RegionManifest>,
603    ) -> Result<BTreeMap<Timestamp, HashSet<RemovedFile>>> {
604        let mut ret = BTreeMap::new();
605        for files in &region_manifest.removed_files.removed_files {
606            let expel_time = Timestamp::new_millisecond(files.removed_at);
607            let set = ret.entry(expel_time).or_insert_with(HashSet::new);
608            set.extend(files.files.iter().cloned());
609        }
610
611        Ok(ret)
612    }
613
614    /// Create partitioned listers for concurrent file listing based on concurrency level.
615    /// Returns a vector of (lister, end_boundary) pairs for parallel processing.
616    async fn partition_region_files(
617        &self,
618        region_id: RegionId,
619        concurrency: usize,
620    ) -> Result<Vec<(Lister, Option<String>)>> {
621        let region_dir = self.access_layer.build_region_dir(region_id);
622
623        let partitions = gen_partition_from_concurrency(concurrency);
624        let bounds = vec![None]
625            .into_iter()
626            .chain(partitions.iter().map(|p| Some(p.clone())))
627            .chain(vec![None])
628            .collect::<Vec<_>>();
629
630        let mut listers = vec![];
631        for part in bounds.windows(2) {
632            let start = part[0].clone();
633            let end = part[1].clone();
634            let mut lister = self.access_layer.object_store().lister_with(&region_dir);
635            if let Some(s) = start {
636                lister = lister.start_after(&s);
637            }
638
639            let lister = lister.await.context(OpenDalSnafu)?;
640            listers.push((lister, end));
641        }
642
643        Ok(listers)
644    }
645
646    /// List all files in the region directory.
647    /// Returns a vector of all file entries found.
648    /// This might take a long time if there are many files in the region directory.
649    #[common_telemetry::tracing::instrument(
650        skip_all,
651        fields(region_id = %region_id, file_cnt_hint = file_cnt_hint)
652    )]
653    async fn list_from_object_store(
654        &self,
655        region_id: RegionId,
656        file_cnt_hint: usize,
657    ) -> Result<Vec<Entry>> {
658        let _timer = GC_DURATION_SECONDS
659            .with_label_values(&["list_files"])
660            .start_timer();
661        let start = tokio::time::Instant::now();
662        let concurrency = (file_cnt_hint / Self::CONCURRENCY_LIST_PER_FILES)
663            .max(1)
664            .min(self.opt.max_concurrent_lister_per_gc_job);
665
666        let listers = self
667            .partition_region_files(region_id, concurrency)
668            .await
669            .inspect_err(|_| {
670                GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
671            })?;
672        let lister_cnt = listers.len();
673
674        // Step 2: Concurrently list all files in the region directory
675        let all_entries = self
676            .list_region_files_concurrent(listers)
677            .await
678            .inspect_err(|_| {
679                GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
680            })?;
681        let cnt = all_entries.len();
682        info!(
683            "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
684            start.elapsed().as_secs_f64(),
685            region_id
686        );
687        Ok(all_entries)
688    }
689
690    /// Concurrently list all files in the region directory using the provided listers.
691    /// Returns a vector of all file entries found across all partitions.
692    async fn list_region_files_concurrent(
693        &self,
694        listers: Vec<(Lister, Option<String>)>,
695    ) -> Result<Vec<Entry>> {
696        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
697        let mut handles = vec![];
698
699        for (lister, end) in listers {
700            let tx = tx.clone();
701            let handle = tokio::spawn(
702                async move {
703                    let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
704                        Ok(e) => {
705                            if let Some(end) = &end {
706                                // reach end, stop listing
707                                e.name() < end.as_str()
708                            } else {
709                                // no end, take all entries
710                                true
711                            }
712                        }
713                        // entry went wrong, log and skip it
714                        Err(err) => {
715                            warn!("Failed to list entry: {}", err);
716                            true
717                        }
718                    });
719                    let stream = stream
720                        .filter(|e| {
721                            if let Ok(e) = &e {
722                                // notice that we only care about files, skip dirs
723                                e.metadata().is_file()
724                            } else {
725                                // error entry, take for further logging
726                                true
727                            }
728                        })
729                        .collect::<Vec<_>>()
730                        .await;
731                    // ordering of files doesn't matter here, so we can send them directly
732                    tx.send(stream).await.expect("Failed to send entries");
733                }
734                .instrument(common_telemetry::tracing::info_span!("gc_list_partition")),
735            );
736
737            handles.push(handle);
738        }
739
740        // Wait for all listers to finish
741        for handle in handles {
742            handle.await.context(JoinSnafu)?;
743        }
744
745        drop(tx); // Close the channel to stop receiving
746
747        // Collect all entries from the channel
748        let mut all_entries = vec![];
749        while let Some(stream) = rx.recv().await {
750            all_entries.extend(stream.into_iter().filter_map(Result::ok));
751        }
752
753        Ok(all_entries)
754    }
755
756    #[allow(clippy::too_many_arguments)]
757    fn filter_deletable_files(
758        &self,
759        is_region_dropped: bool,
760        entries: Vec<Entry>,
761        in_manifest: &HashMap<FileId, Option<IndexVersion>>,
762        in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
763        may_linger_files: &HashSet<&RemovedFile>,
764        eligible_for_delete: &HashSet<&RemovedFile>,
765        unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
766    ) -> Vec<RemovedFile> {
767        let mut ready_for_delete = vec![];
768        // all group by file id for easier checking
769        let in_tmp_ref: HashMap<FileId, HashSet<IndexVersion>> =
770            in_tmp_ref
771                .iter()
772                .fold(HashMap::new(), |mut acc, (file, version)| {
773                    let indices = acc.entry(*file).or_default();
774                    if let Some(version) = version {
775                        indices.insert(*version);
776                    }
777                    acc
778                });
779
780        let may_linger_files: HashMap<FileId, HashSet<&RemovedFile>> = may_linger_files
781            .iter()
782            .fold(HashMap::new(), |mut acc, file| {
783                let indices = acc.entry(file.file_id()).or_default();
784                indices.insert(file);
785                acc
786            });
787
788        let eligible_for_delete: HashMap<FileId, HashSet<&RemovedFile>> = eligible_for_delete
789            .iter()
790            .fold(HashMap::new(), |mut acc, file| {
791                let indices = acc.entry(file.file_id()).or_default();
792                indices.insert(file);
793                acc
794            });
795
796        for entry in entries {
797            let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) {
798                Ok((file_id, file_type)) => (file_id, file_type),
799                Err(err) => {
800                    error!(err; "Failed to parse file id from path: {}", entry.name());
801                    // if we can't parse the file id, it means it's not a sst or index file
802                    // shouldn't delete it because we don't know what it is
803                    GC_SKIPPED_UNPARSABLE_FILES.inc();
804                    continue;
805                }
806            };
807
808            let should_delete = match file_type {
809                FileType::Parquet => {
810                    let is_in_manifest = in_manifest.contains_key(&file_id);
811                    let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id);
812                    let is_linger = may_linger_files.contains_key(&file_id);
813                    let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id);
814
815                    should_delete_file(
816                        is_in_manifest,
817                        is_in_tmp_ref,
818                        is_linger,
819                        is_eligible_for_delete,
820                        is_region_dropped,
821                        &entry,
822                        unknown_file_may_linger_until,
823                    )
824                }
825                FileType::Puffin(version) => {
826                    // notice need to check both file id and version
827                    let is_in_manifest = in_manifest
828                        .get(&file_id)
829                        .map(|opt_ver| *opt_ver == Some(version))
830                        .unwrap_or(false);
831                    let is_in_tmp_ref = in_tmp_ref
832                        .get(&file_id)
833                        .map(|versions| versions.contains(&version))
834                        .unwrap_or(false);
835                    let is_linger = may_linger_files
836                        .get(&file_id)
837                        .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
838                        .unwrap_or(false);
839                    let is_eligible_for_delete = eligible_for_delete
840                        .get(&file_id)
841                        .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
842                        .unwrap_or(false);
843
844                    should_delete_file(
845                        is_in_manifest,
846                        is_in_tmp_ref,
847                        is_linger,
848                        is_eligible_for_delete,
849                        is_region_dropped,
850                        &entry,
851                        unknown_file_may_linger_until,
852                    )
853                }
854            };
855
856            if should_delete {
857                let removed_file = match file_type {
858                    FileType::Parquet => {
859                        // notice this cause we don't track index version for parquet files
860                        // since entries comes from listing, we can't get index version from path
861                        RemovedFile::File(file_id, None)
862                    }
863                    FileType::Puffin(version) => {
864                        GC_ORPHANED_INDEX_FILES.inc();
865                        RemovedFile::Index(file_id, version)
866                    }
867                };
868                ready_for_delete.push(removed_file);
869            }
870        }
871        ready_for_delete
872    }
873
874    /// List files to be deleted based on their presence in the manifest, temporary references, and recently removed files.
875    /// Returns a vector of `RemovedFile` that are eligible for deletion.
876    ///
877    /// When `full_file_listing` is false, this method will only delete (subset of) files tracked in
878    /// `recently_removed_files`, which significantly
879    /// improves performance. When `full_file_listing` is true, it read from `all_entries` to find
880    /// and delete orphan files (files not tracked in the manifest).
881    ///
882    pub async fn list_to_be_deleted_files(
883        &self,
884        region_id: RegionId,
885        is_region_dropped: bool,
886        in_manifest: &HashMap<FileId, Option<IndexVersion>>,
887        in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
888        recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
889        all_entries: Vec<Entry>,
890    ) -> Result<Vec<RemovedFile>> {
891        let now = chrono::Utc::now();
892        let may_linger_until = self
893            .opt
894            .lingering_time
895            .map(|lingering_time| {
896                chrono::Duration::from_std(lingering_time)
897                    .with_context(|_| DurationOutOfRangeSnafu {
898                        input: lingering_time,
899                    })
900                    .map(|t| now - t)
901            })
902            .transpose()?;
903
904        let unknown_file_may_linger_until = now
905            - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
906                |_| DurationOutOfRangeSnafu {
907                    input: self.opt.unknown_file_lingering_time,
908                },
909            )?;
910
911        // files that may linger, which means they are not in use but may still be kept for a while
912        let threshold =
913            may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
914        // TODO(discord9): if region is already closed, maybe handle threshold differently?
915        // is consider all files to be recently removed acceptable?
916        let mut recently_removed_files = recently_removed_files;
917        let may_linger_files = match threshold {
918            Some(threshold) => recently_removed_files.split_off(&threshold),
919            None => BTreeMap::new(),
920        };
921        debug!("may_linger_files: {:?}", may_linger_files);
922
923        let all_may_linger_files = may_linger_files.values().flatten().collect::<HashSet<_>>();
924
925        // known files(tracked in removed files field) that are eligible for removal
926        // (passed lingering time)
927        let eligible_for_removal = recently_removed_files
928            .values()
929            .flatten()
930            .collect::<HashSet<_>>();
931
932        // When full_file_listing is false, skip expensive list operations and only delete
933        // files that are tracked in recently_removed_files
934        if !self.full_file_listing {
935            // Only delete files that:
936            // 1. Are in recently_removed_files (tracked in manifest)
937            // 2. Are not in use(in manifest or tmp ref)
938            // 3. Have passed the lingering time
939            let files_to_delete: Vec<RemovedFile> = eligible_for_removal
940                .iter()
941                .filter(|file_id| {
942                    let in_use = match file_id {
943                        RemovedFile::File(file_id, index_version) => {
944                            in_manifest.get(file_id) == Some(index_version)
945                                || in_tmp_ref.contains(&(*file_id, *index_version))
946                        }
947                        RemovedFile::Index(file_id, index_version) => {
948                            in_manifest.get(file_id) == Some(&Some(*index_version))
949                                || in_tmp_ref.contains(&(*file_id, Some(*index_version)))
950                        }
951                    };
952                    !in_use
953                })
954                .map(|&f| f.clone())
955                .collect();
956
957            info!(
958                "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
959                region_id,
960                files_to_delete.len()
961            );
962
963            return Ok(files_to_delete);
964        }
965
966        // Full file listing mode: get the full list of files from object store
967
968        // Step 3: Filter files to determine which ones can be deleted
969        let all_unused_files_ready_for_delete = self.filter_deletable_files(
970            is_region_dropped,
971            all_entries,
972            in_manifest,
973            in_tmp_ref,
974            &all_may_linger_files,
975            &eligible_for_removal,
976            unknown_file_may_linger_until,
977        );
978
979        Ok(all_unused_files_ready_for_delete)
980    }
981}
982
983/// Generate partition prefixes based on concurrency and
984/// assume file names are evenly-distributed uuid string,
985/// to evenly distribute files across partitions.
986/// For example, if concurrency is 2, partition prefixes will be:
987/// ["8"] so it divide uuids into two partitions based on the first character.
988/// If concurrency is 32, partition prefixes will be:
989/// ["08", "10", "18", "20", "28", "30", "38" ..., "f0", "f8"]
990/// if concurrency is 1, it returns an empty vector.
991///
992fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
993    let n = concurrency.next_power_of_two();
994    if n <= 1 {
995        return vec![];
996    }
997
998    // `d` is the number of hex characters required to build the partition key.
999    // `p` is the total number of possible values for a key of length `d`.
1000    // We need to find the smallest `d` such that 16^d >= n.
1001    let mut d = 0;
1002    let mut p: u128 = 1;
1003    while p < n as u128 {
1004        p *= 16;
1005        d += 1;
1006    }
1007
1008    let total_space = p;
1009    let step = total_space / n as u128;
1010
1011    (1..n)
1012        .map(|i| {
1013            let boundary = i as u128 * step;
1014            format!("{:0width$x}", boundary, width = d)
1015        })
1016        .collect()
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022
1023    #[test]
1024    fn test_gen_partition_from_concurrency() {
1025        let partitions = gen_partition_from_concurrency(1);
1026        assert!(partitions.is_empty());
1027
1028        let partitions = gen_partition_from_concurrency(2);
1029        assert_eq!(partitions, vec!["8"]);
1030
1031        let partitions = gen_partition_from_concurrency(3);
1032        assert_eq!(partitions, vec!["4", "8", "c"]);
1033
1034        let partitions = gen_partition_from_concurrency(4);
1035        assert_eq!(partitions, vec!["4", "8", "c"]);
1036
1037        let partitions = gen_partition_from_concurrency(8);
1038        assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
1039
1040        let partitions = gen_partition_from_concurrency(16);
1041        assert_eq!(
1042            partitions,
1043            vec![
1044                "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
1045            ]
1046        );
1047
1048        let partitions = gen_partition_from_concurrency(32);
1049        assert_eq!(
1050            partitions,
1051            [
1052                "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
1053                "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
1054                "e8", "f0", "f8",
1055            ]
1056        );
1057    }
1058}