mito2/
gc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! GC worker which periodically checks and removes unused/obsolete  SST files.
16//!
17//! `expel time`: the time when the file is considered as removed, as in removed from the manifest.
18//! `lingering time`: the time duration before deleting files after they are removed from manifest.
19//! `delta manifest`: the manifest files after the last checkpoint that contains the changes to the manifest.
20//! `delete time`: the time when the file is actually deleted from the object store.
21//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint.
22//!
23
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::tracing::Instrument as _;
30use common_telemetry::{debug, error, info, warn};
31use common_time::Timestamp;
32use itertools::Itertools;
33use object_store::{Entry, Lister};
34use serde::{Deserialize, Serialize};
35use snafu::{ResultExt as _, ensure};
36use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
37use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
38use tokio_stream::StreamExt;
39
40use crate::access_layer::AccessLayerRef;
41use crate::cache::CacheManagerRef;
42use crate::cache::file_cache::FileType;
43use crate::config::MitoConfig;
44use crate::error::{
45    DurationOutOfRangeSnafu, InvalidRequestSnafu, JoinSnafu, OpenDalSnafu, Result,
46    TooManyGcJobsSnafu, UnexpectedSnafu,
47};
48use crate::manifest::action::{RegionManifest, RemovedFile};
49use crate::metrics::{
50    GC_DELETE_FILE_CNT, GC_DURATION_SECONDS, GC_ERRORS_TOTAL, GC_FILES_DELETED_TOTAL,
51    GC_ORPHANED_INDEX_FILES, GC_RUNS_TOTAL, GC_SKIPPED_UNPARSABLE_FILES,
52};
53use crate::region::{MitoRegionRef, RegionRoleState};
54use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_index};
55use crate::sst::location::{self};
56
57#[cfg(test)]
58mod worker_test;
59
60/// Helper function to determine if a file should be deleted based on common logic
61/// shared between Parquet and Puffin file types.
62fn should_delete_file(
63    is_in_manifest: bool,
64    is_in_tmp_ref: bool,
65    is_linger: bool,
66    is_eligible_for_delete: bool,
67    is_region_dropped: bool,
68    _entry: &Entry,
69    _unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
70) -> bool {
71    let is_known = is_linger || is_eligible_for_delete;
72
73    !is_in_manifest
74        && !is_in_tmp_ref
75        && if is_known {
76            is_eligible_for_delete
77        } else {
78            !is_in_tmp_ref && is_region_dropped
79        }
80}
81
82/// Limit the amount of concurrent GC jobs on the datanode
83pub struct GcLimiter {
84    pub gc_job_limit: Arc<tokio::sync::Semaphore>,
85    gc_concurrency: usize,
86}
87
88pub type GcLimiterRef = Arc<GcLimiter>;
89
90impl GcLimiter {
91    pub fn new(gc_concurrency: usize) -> Self {
92        Self {
93            gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
94            gc_concurrency,
95        }
96    }
97
98    pub fn running_gc_tasks(&self) -> u32 {
99        (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
100    }
101
102    pub fn gc_concurrency(&self) -> u32 {
103        self.gc_concurrency as u32
104    }
105
106    pub fn gc_stat(&self) -> GcStat {
107        GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
108    }
109
110    /// Try to acquire a permit for a GC job.
111    ///
112    /// If no permit is available, returns an `TooManyGcJobs` error.
113    pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
114        self.gc_job_limit
115            .clone()
116            .try_acquire_owned()
117            .map_err(|e| match e {
118                TryAcquireError::Closed => UnexpectedSnafu {
119                    reason: format!("Failed to acquire gc permit: {e}"),
120                }
121                .build(),
122                TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
123            })
124    }
125}
126
127#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
128#[serde(default)]
129pub struct GcConfig {
130    /// Whether GC is enabled.
131    pub enable: bool,
132    /// Lingering time before deleting files.
133    /// Should be long enough to allow long running queries to finish.
134    /// If set to None, then unused files will be deleted immediately.
135    ///
136    #[serde(with = "humantime_serde")]
137    pub lingering_time: Option<Duration>,
138    /// Lingering time before deleting unknown files(files with undetermine expel time).
139    /// expel time is the time when the file is considered as removed, as in removed from the manifest.
140    /// This should only occur rarely, as manifest keep tracks in `removed_files` field
141    /// unless something goes wrong.
142    #[serde(with = "humantime_serde")]
143    pub unknown_file_lingering_time: Duration,
144    /// Maximum concurrent list operations per GC job.
145    /// This is used to limit the number of concurrent listing operations and speed up listing.
146    pub max_concurrent_lister_per_gc_job: usize,
147    /// Maximum concurrent GC jobs.
148    /// This is used to limit the number of concurrent GC jobs running on the datanode
149    /// to prevent too many concurrent GC jobs from overwhelming the datanode.
150    pub max_concurrent_gc_job: usize,
151}
152
153impl Default for GcConfig {
154    fn default() -> Self {
155        Self {
156            enable: false,
157            // expect long running queries to be finished(or at least be able to notify it's using a deleted file) within a reasonable time
158            lingering_time: Some(Duration::from_secs(60)),
159            // 1 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
160            unknown_file_lingering_time: Duration::from_secs(60 * 60),
161            max_concurrent_lister_per_gc_job: 32,
162            max_concurrent_gc_job: 4,
163        }
164    }
165}
166
167pub struct LocalGcWorker {
168    pub(crate) access_layer: AccessLayerRef,
169    pub(crate) cache_manager: Option<CacheManagerRef>,
170    pub(crate) regions: BTreeMap<RegionId, Option<MitoRegionRef>>,
171    /// Lingering time before deleting files.
172    pub(crate) opt: GcConfig,
173    /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
174    ///
175    /// Also contains manifest versions of regions when the tmp ref files are generated.
176    /// Used to determine whether the tmp ref files are outdated.
177    pub(crate) file_ref_manifest: FileRefsManifest,
178    _permit: OwnedSemaphorePermit,
179    /// Whether to perform full file listing during GC.
180    /// When set to false, GC will only delete files that are tracked in the manifest's removed_files,
181    /// which can significantly improve performance by avoiding expensive list operations.
182    /// When set to true, GC will perform a full listing to find and delete orphan files
183    /// (files not tracked in the manifest).
184    ///
185    /// Set to false for regular GC operations to optimize performance.
186    /// Set to true periodically or when you need to clean up orphan files.
187    pub full_file_listing: bool,
188}
189
190pub struct ManifestOpenConfig {
191    pub compress_manifest: bool,
192    pub manifest_checkpoint_distance: u64,
193    pub experimental_manifest_keep_removed_file_count: usize,
194    pub experimental_manifest_keep_removed_file_ttl: Duration,
195}
196
197impl From<MitoConfig> for ManifestOpenConfig {
198    fn from(mito_config: MitoConfig) -> Self {
199        Self {
200            compress_manifest: mito_config.compress_manifest,
201            manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
202            experimental_manifest_keep_removed_file_count: mito_config
203                .experimental_manifest_keep_removed_file_count,
204            experimental_manifest_keep_removed_file_ttl: mito_config
205                .experimental_manifest_keep_removed_file_ttl,
206        }
207    }
208}
209
210impl LocalGcWorker {
211    /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC.
212    /// The regions are specified by their `RegionId` and should all belong to the same table.
213    ///
214    #[allow(clippy::too_many_arguments)]
215    pub async fn try_new(
216        access_layer: AccessLayerRef,
217        cache_manager: Option<CacheManagerRef>,
218        regions_to_gc: BTreeMap<RegionId, Option<MitoRegionRef>>,
219        opt: GcConfig,
220        file_ref_manifest: FileRefsManifest,
221        limiter: &GcLimiterRef,
222        full_file_listing: bool,
223    ) -> Result<Self> {
224        if let Some(first_region_id) = regions_to_gc.keys().next() {
225            let table_id = first_region_id.table_id();
226            for region_id in regions_to_gc.keys() {
227                ensure!(
228                    region_id.table_id() == table_id,
229                    InvalidRequestSnafu {
230                        region_id: *region_id,
231                        reason: format!(
232                            "Region {} does not belong to table {}",
233                            region_id, table_id
234                        ),
235                    }
236                );
237            }
238        }
239
240        let permit = limiter.permit()?;
241
242        Ok(Self {
243            access_layer,
244            cache_manager,
245            regions: regions_to_gc,
246            opt,
247            file_ref_manifest,
248            _permit: permit,
249            full_file_listing,
250        })
251    }
252
253    /// Get tmp ref files for all current regions
254    pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileRef>>> {
255        let mut tmp_ref_files = HashMap::new();
256        for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
257            tmp_ref_files
258                .entry(*region_id)
259                .or_insert_with(HashSet::new)
260                .extend(file_refs.clone());
261            // no need to include manifest files here, as they are already included in region manifest
262        }
263
264        Ok(tmp_ref_files)
265    }
266
267    /// Run the GC worker in serial mode,
268    /// considering list files could be slow and run multiple regions in parallel
269    /// may cause too many concurrent listing operations.
270    ///
271    /// TODO(discord9): consider instead running in parallel mode
272    #[common_telemetry::tracing::instrument(
273        skip_all,
274        fields(region_count = self.regions.len(), full_file_listing = self.full_file_listing)
275    )]
276    pub async fn run(self) -> Result<GcReport> {
277        info!("LocalGcWorker started");
278        let _timer = GC_DURATION_SECONDS
279            .with_label_values(&["total"])
280            .start_timer();
281        let now = std::time::Instant::now();
282
283        let mut deleted_files = HashMap::new();
284        let mut deleted_indexes = HashMap::new();
285        let tmp_ref_files = self.read_tmp_ref_files().await?;
286        for (region_id, region) in &self.regions {
287            let per_region_time = std::time::Instant::now();
288            if region.as_ref().map(|r| r.manifest_ctx.current_state())
289                == Some(RegionRoleState::Follower)
290            {
291                return UnexpectedSnafu {
292                    reason: format!(
293                        "Region {} is in Follower state, should not run GC on follower regions",
294                        region_id
295                    ),
296                }
297                .fail();
298            }
299            let tmp_ref_files = tmp_ref_files
300                .get(region_id)
301                .cloned()
302                .unwrap_or_else(HashSet::new);
303            let files = self
304                .do_region_gc(*region_id, region.clone(), &tmp_ref_files)
305                .await?;
306            let index_files = files
307                .iter()
308                .filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
309                .collect_vec();
310            deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
311            deleted_indexes.insert(*region_id, index_files);
312            debug!(
313                "GC for region {} took {} secs.",
314                region_id,
315                per_region_time.elapsed().as_secs_f32()
316            );
317        }
318        info!(
319            "LocalGcWorker finished after {} secs.",
320            now.elapsed().as_secs_f32()
321        );
322        let report = GcReport {
323            deleted_files,
324            deleted_indexes,
325            need_retry_regions: HashSet::new(),
326        };
327        Ok(report)
328    }
329}
330
331impl LocalGcWorker {
332    /// concurrency of listing files per region.
333    /// This is used to limit the number of concurrent listing operations and speed up listing
334    pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
335
336    /// Perform GC for the region.
337    /// 1. Get all the removed files in delta manifest files and their expel times
338    /// 2. List all files in the region dir concurrently
339    /// 3. Filter out the files that are still in use or may still be kept for a while
340    /// 4. Delete the unused files
341    ///
342    /// Note that the files that are still in use or may still be kept for a while are not deleted
343    /// to avoid deleting files that are still needed.
344    #[common_telemetry::tracing::instrument(
345        skip_all,
346        fields(
347            region_id = %region_id,
348            full_file_listing = self.full_file_listing,
349            region_present = region.is_some()
350        )
351    )]
352    pub async fn do_region_gc(
353        &self,
354        region_id: RegionId,
355        region: Option<MitoRegionRef>,
356        tmp_ref_files: &HashSet<FileRef>,
357    ) -> Result<Vec<RemovedFile>> {
358        let mode = if self.full_file_listing {
359            "full_listing"
360        } else {
361            "fast"
362        };
363        GC_RUNS_TOTAL.with_label_values(&[mode]).inc();
364        debug!(
365            "Doing gc for region {}, {}",
366            region_id,
367            if region.is_some() {
368                "region found"
369            } else {
370                "region not found, might be dropped"
371            }
372        );
373
374        ensure!(
375            region.is_some() || self.full_file_listing,
376            InvalidRequestSnafu {
377                region_id,
378                reason: "region not found and full_file_listing is false; refusing GC without full listing".to_string(),
379            }
380        );
381
382        let manifest = if let Some(region) = &region {
383            let manifest = region.manifest_ctx.manifest().await;
384            // If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use.
385            let file_ref_manifest_version = self
386                .file_ref_manifest
387                .manifest_version
388                .get(&region.region_id())
389                .cloned();
390            if file_ref_manifest_version != Some(manifest.manifest_version) {
391                // should be rare enough(few seconds after leader update manifest version), just skip gc for this region
392                warn!(
393                    "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
394                    file_ref_manifest_version,
395                    region.region_id(),
396                    manifest.manifest_version
397                );
398                GC_ERRORS_TOTAL
399                    .with_label_values(&["manifest_mismatch"])
400                    .inc();
401                return Ok(vec![]);
402            }
403            Some(manifest)
404        } else {
405            None
406        };
407
408        let all_entries = if let Some(manifest) = &manifest
409            && self.full_file_listing
410        {
411            // do the time consuming listing only when full_file_listing is true(and region is open)
412            // and do it first to make sure we have the latest manifest etc.
413            self.list_from_object_store(region_id, manifest.files.len())
414                .await?
415        } else if manifest.is_none() && self.full_file_listing {
416            // if region is already dropped, we have no manifest to refer to,
417            // so only do gc if `full_file_listing` is true, otherwise just skip it
418            // TODO(discord9): is doing one serial listing enough here?
419            self.list_from_object_store(region_id, Self::CONCURRENCY_LIST_PER_FILES)
420                .await?
421        } else {
422            vec![]
423        };
424
425        let recently_removed_files = if let Some(manifest) = &manifest {
426            self.get_removed_files_expel_times(manifest).await?
427        } else {
428            Default::default()
429        };
430
431        if recently_removed_files.is_empty() {
432            // no files to remove, skip
433            debug!("No recently removed files to gc for region {}", region_id);
434        }
435
436        let removed_file_cnt = recently_removed_files
437            .values()
438            .map(|s| s.len())
439            .sum::<usize>();
440
441        let current_files = manifest.as_ref().map(|m| &m.files);
442
443        let in_manifest = if let Some(current_files) = current_files {
444            current_files
445                .iter()
446                .map(|(file_id, meta)| (*file_id, meta.index_version()))
447                .collect::<HashMap<_, _>>()
448        } else {
449            Default::default()
450        };
451
452        let is_region_dropped = region.is_none();
453
454        let in_tmp_ref = tmp_ref_files
455            .iter()
456            .map(|file_ref| (file_ref.file_id, file_ref.index_version))
457            .collect::<HashSet<_>>();
458
459        let deletable_files = self
460            .list_to_be_deleted_files(
461                region_id,
462                is_region_dropped,
463                &in_manifest,
464                &in_tmp_ref,
465                recently_removed_files,
466                all_entries,
467            )
468            .await?;
469
470        let unused_file_cnt = deletable_files.len();
471
472        info!(
473            "gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete count: {}",
474            if region.is_none() {
475                "(region dropped)"
476            } else {
477                ""
478            },
479            current_files.map(|c| c.len()).unwrap_or(0),
480            tmp_ref_files.len(),
481            removed_file_cnt,
482            deletable_files.len(),
483        );
484        debug!(
485            "gc: deletable files for region {}: {:?}",
486            region_id, &deletable_files
487        );
488
489        debug!(
490            "Found {} unused index files to delete for region {}",
491            deletable_files.len(),
492            region_id
493        );
494
495        let _delete_timer = GC_DURATION_SECONDS
496            .with_label_values(&["delete_files"])
497            .start_timer();
498        self.delete_files(region_id, &deletable_files).await?;
499
500        debug!(
501            "Successfully deleted {} unused files for region {}",
502            unused_file_cnt, region_id
503        );
504        if let Some(region) = &region {
505            let _update_timer = GC_DURATION_SECONDS
506                .with_label_values(&["update_manifest"])
507                .start_timer();
508            self.update_manifest_removed_files(region, deletable_files.clone())
509                .await?;
510        }
511
512        Ok(deletable_files)
513    }
514
515    #[common_telemetry::tracing::instrument(
516        skip_all,
517        fields(region_id = %region_id, removed_file_count = removed_files.len())
518    )]
519    async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
520        let mut index_ids = vec![];
521        let file_pairs = removed_files
522            .iter()
523            .filter_map(|f| match f {
524                RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))),
525                RemovedFile::Index(file_id, index_version) => {
526                    let region_index_id =
527                        RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version);
528                    index_ids.push(region_index_id);
529                    None
530                }
531            })
532            .collect_vec();
533        delete_files(
534            region_id,
535            &file_pairs,
536            true,
537            &self.access_layer,
538            &self.cache_manager,
539        )
540        .await?;
541
542        if !file_pairs.is_empty() {
543            let deleted_count = file_pairs.len() as u64;
544            GC_FILES_DELETED_TOTAL
545                .with_label_values(&["parquet"])
546                .inc_by(deleted_count);
547            GC_DELETE_FILE_CNT.inc_by(deleted_count);
548        }
549
550        for index_id in index_ids {
551            match delete_index(index_id, &self.access_layer, &self.cache_manager).await {
552                Ok(()) => {
553                    GC_FILES_DELETED_TOTAL.with_label_values(&["index"]).inc();
554                    GC_DELETE_FILE_CNT.inc();
555                }
556                Err(err) => {
557                    GC_ERRORS_TOTAL.with_label_values(&["delete_failed"]).inc();
558                    return Err(err);
559                }
560            }
561        }
562
563        Ok(())
564    }
565
566    /// Update region manifest for clear the actually deleted files
567    #[common_telemetry::tracing::instrument(
568        skip_all,
569        fields(region_id = %region.region_id(), deleted_file_count = deleted_files.len())
570    )]
571    async fn update_manifest_removed_files(
572        &self,
573        region: &MitoRegionRef,
574        deleted_files: Vec<RemovedFile>,
575    ) -> Result<()> {
576        let deleted_file_cnt = deleted_files.len();
577        debug!(
578            "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
579            region.region_id()
580        );
581
582        let mut manager = region.manifest_ctx.manifest_manager.write().await;
583        let cnt = deleted_files.len();
584        manager.clear_deleted_files(deleted_files);
585        debug!(
586            "Updated region_id={} region manifest to clear {cnt} deleted files",
587            region.region_id(),
588        );
589
590        Ok(())
591    }
592
593    /// Get all the removed files in delta manifest files and their expel times.
594    /// The expel time is the time when the file is considered as removed.
595    /// Which is the last modified time of delta manifest which contains the remove action.
596    ///
597    pub async fn get_removed_files_expel_times(
598        &self,
599        region_manifest: &Arc<RegionManifest>,
600    ) -> Result<BTreeMap<Timestamp, HashSet<RemovedFile>>> {
601        let mut ret = BTreeMap::new();
602        for files in &region_manifest.removed_files.removed_files {
603            let expel_time = Timestamp::new_millisecond(files.removed_at);
604            let set = ret.entry(expel_time).or_insert_with(HashSet::new);
605            set.extend(files.files.iter().cloned());
606        }
607
608        Ok(ret)
609    }
610
611    /// Create partitioned listers for concurrent file listing based on concurrency level.
612    /// Returns a vector of (lister, end_boundary) pairs for parallel processing.
613    async fn partition_region_files(
614        &self,
615        region_id: RegionId,
616        concurrency: usize,
617    ) -> Result<Vec<(Lister, Option<String>)>> {
618        let region_dir = self.access_layer.build_region_dir(region_id);
619
620        let partitions = gen_partition_from_concurrency(concurrency);
621        let bounds = vec![None]
622            .into_iter()
623            .chain(partitions.iter().map(|p| Some(p.clone())))
624            .chain(vec![None])
625            .collect::<Vec<_>>();
626
627        let mut listers = vec![];
628        for part in bounds.windows(2) {
629            let start = part[0].clone();
630            let end = part[1].clone();
631            let mut lister = self.access_layer.object_store().lister_with(&region_dir);
632            if let Some(s) = start {
633                lister = lister.start_after(&s);
634            }
635
636            let lister = lister.await.context(OpenDalSnafu)?;
637            listers.push((lister, end));
638        }
639
640        Ok(listers)
641    }
642
643    /// List all files in the region directory.
644    /// Returns a vector of all file entries found.
645    /// This might take a long time if there are many files in the region directory.
646    #[common_telemetry::tracing::instrument(
647        skip_all,
648        fields(region_id = %region_id, file_cnt_hint = file_cnt_hint)
649    )]
650    async fn list_from_object_store(
651        &self,
652        region_id: RegionId,
653        file_cnt_hint: usize,
654    ) -> Result<Vec<Entry>> {
655        let _timer = GC_DURATION_SECONDS
656            .with_label_values(&["list_files"])
657            .start_timer();
658        let start = tokio::time::Instant::now();
659        let concurrency = (file_cnt_hint / Self::CONCURRENCY_LIST_PER_FILES)
660            .max(1)
661            .min(self.opt.max_concurrent_lister_per_gc_job);
662
663        let listers = self
664            .partition_region_files(region_id, concurrency)
665            .await
666            .inspect_err(|_| {
667                GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
668            })?;
669        let lister_cnt = listers.len();
670
671        // Step 2: Concurrently list all files in the region directory
672        let all_entries = self
673            .list_region_files_concurrent(listers)
674            .await
675            .inspect_err(|_| {
676                GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
677            })?;
678        let cnt = all_entries.len();
679        info!(
680            "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
681            start.elapsed().as_secs_f64(),
682            region_id
683        );
684        Ok(all_entries)
685    }
686
687    /// Concurrently list all files in the region directory using the provided listers.
688    /// Returns a vector of all file entries found across all partitions.
689    async fn list_region_files_concurrent(
690        &self,
691        listers: Vec<(Lister, Option<String>)>,
692    ) -> Result<Vec<Entry>> {
693        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
694        let mut handles = vec![];
695
696        for (lister, end) in listers {
697            let tx = tx.clone();
698            let handle = tokio::spawn(
699                async move {
700                    let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
701                        Ok(e) => {
702                            if let Some(end) = &end {
703                                // reach end, stop listing
704                                e.name() < end.as_str()
705                            } else {
706                                // no end, take all entries
707                                true
708                            }
709                        }
710                        // entry went wrong, log and skip it
711                        Err(err) => {
712                            warn!("Failed to list entry: {}", err);
713                            true
714                        }
715                    });
716                    let stream = stream
717                        .filter(|e| {
718                            if let Ok(e) = &e {
719                                // notice that we only care about files, skip dirs
720                                e.metadata().is_file()
721                            } else {
722                                // error entry, take for further logging
723                                true
724                            }
725                        })
726                        .collect::<Vec<_>>()
727                        .await;
728                    // ordering of files doesn't matter here, so we can send them directly
729                    tx.send(stream).await.expect("Failed to send entries");
730                }
731                .instrument(common_telemetry::tracing::info_span!("gc_list_partition")),
732            );
733
734            handles.push(handle);
735        }
736
737        // Wait for all listers to finish
738        for handle in handles {
739            handle.await.context(JoinSnafu)?;
740        }
741
742        drop(tx); // Close the channel to stop receiving
743
744        // Collect all entries from the channel
745        let mut all_entries = vec![];
746        while let Some(stream) = rx.recv().await {
747            all_entries.extend(stream.into_iter().filter_map(Result::ok));
748        }
749
750        Ok(all_entries)
751    }
752
753    #[allow(clippy::too_many_arguments)]
754    fn filter_deletable_files(
755        &self,
756        is_region_dropped: bool,
757        entries: Vec<Entry>,
758        in_manifest: &HashMap<FileId, Option<IndexVersion>>,
759        in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
760        may_linger_files: &HashSet<&RemovedFile>,
761        eligible_for_delete: &HashSet<&RemovedFile>,
762        unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
763    ) -> Vec<RemovedFile> {
764        let mut ready_for_delete = vec![];
765        // all group by file id for easier checking
766        let in_tmp_ref: HashMap<FileId, HashSet<IndexVersion>> =
767            in_tmp_ref
768                .iter()
769                .fold(HashMap::new(), |mut acc, (file, version)| {
770                    let indices = acc.entry(*file).or_default();
771                    if let Some(version) = version {
772                        indices.insert(*version);
773                    }
774                    acc
775                });
776
777        let may_linger_files: HashMap<FileId, HashSet<&RemovedFile>> = may_linger_files
778            .iter()
779            .fold(HashMap::new(), |mut acc, file| {
780                let indices = acc.entry(file.file_id()).or_default();
781                indices.insert(file);
782                acc
783            });
784
785        let eligible_for_delete: HashMap<FileId, HashSet<&RemovedFile>> = eligible_for_delete
786            .iter()
787            .fold(HashMap::new(), |mut acc, file| {
788                let indices = acc.entry(file.file_id()).or_default();
789                indices.insert(file);
790                acc
791            });
792
793        for entry in entries {
794            let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) {
795                Ok((file_id, file_type)) => (file_id, file_type),
796                Err(err) => {
797                    error!(err; "Failed to parse file id from path: {}", entry.name());
798                    // if we can't parse the file id, it means it's not a sst or index file
799                    // shouldn't delete it because we don't know what it is
800                    GC_SKIPPED_UNPARSABLE_FILES.inc();
801                    continue;
802                }
803            };
804
805            let should_delete = match file_type {
806                FileType::Parquet => {
807                    let is_in_manifest = in_manifest.contains_key(&file_id);
808                    let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id);
809                    let is_linger = may_linger_files.contains_key(&file_id);
810                    let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id);
811
812                    should_delete_file(
813                        is_in_manifest,
814                        is_in_tmp_ref,
815                        is_linger,
816                        is_eligible_for_delete,
817                        is_region_dropped,
818                        &entry,
819                        unknown_file_may_linger_until,
820                    )
821                }
822                FileType::Puffin(version) => {
823                    // notice need to check both file id and version
824                    let is_in_manifest = in_manifest
825                        .get(&file_id)
826                        .map(|opt_ver| *opt_ver == Some(version))
827                        .unwrap_or(false);
828                    let is_in_tmp_ref = in_tmp_ref
829                        .get(&file_id)
830                        .map(|versions| versions.contains(&version))
831                        .unwrap_or(false);
832                    let is_linger = may_linger_files
833                        .get(&file_id)
834                        .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
835                        .unwrap_or(false);
836                    let is_eligible_for_delete = eligible_for_delete
837                        .get(&file_id)
838                        .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
839                        .unwrap_or(false);
840
841                    should_delete_file(
842                        is_in_manifest,
843                        is_in_tmp_ref,
844                        is_linger,
845                        is_eligible_for_delete,
846                        is_region_dropped,
847                        &entry,
848                        unknown_file_may_linger_until,
849                    )
850                }
851            };
852
853            if should_delete {
854                let removed_file = match file_type {
855                    FileType::Parquet => {
856                        // notice this cause we don't track index version for parquet files
857                        // since entries comes from listing, we can't get index version from path
858                        RemovedFile::File(file_id, None)
859                    }
860                    FileType::Puffin(version) => {
861                        GC_ORPHANED_INDEX_FILES.inc();
862                        RemovedFile::Index(file_id, version)
863                    }
864                };
865                ready_for_delete.push(removed_file);
866            }
867        }
868        ready_for_delete
869    }
870
871    /// List files to be deleted based on their presence in the manifest, temporary references, and recently removed files.
872    /// Returns a vector of `RemovedFile` that are eligible for deletion.
873    ///
874    /// When `full_file_listing` is false, this method will only delete (subset of) files tracked in
875    /// `recently_removed_files`, which significantly
876    /// improves performance. When `full_file_listing` is true, it read from `all_entries` to find
877    /// and delete orphan files (files not tracked in the manifest).
878    ///
879    pub async fn list_to_be_deleted_files(
880        &self,
881        region_id: RegionId,
882        is_region_dropped: bool,
883        in_manifest: &HashMap<FileId, Option<IndexVersion>>,
884        in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
885        recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
886        all_entries: Vec<Entry>,
887    ) -> Result<Vec<RemovedFile>> {
888        let now = chrono::Utc::now();
889        let may_linger_until = self
890            .opt
891            .lingering_time
892            .map(|lingering_time| {
893                chrono::Duration::from_std(lingering_time)
894                    .with_context(|_| DurationOutOfRangeSnafu {
895                        input: lingering_time,
896                    })
897                    .map(|t| now - t)
898            })
899            .transpose()?;
900
901        let unknown_file_may_linger_until = now
902            - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
903                |_| DurationOutOfRangeSnafu {
904                    input: self.opt.unknown_file_lingering_time,
905                },
906            )?;
907
908        // files that may linger, which means they are not in use but may still be kept for a while
909        let threshold =
910            may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
911        // TODO(discord9): if region is already closed, maybe handle threshold differently?
912        // is consider all files to be recently removed acceptable?
913        let mut recently_removed_files = recently_removed_files;
914        let may_linger_files = match threshold {
915            Some(threshold) => recently_removed_files.split_off(&threshold),
916            None => BTreeMap::new(),
917        };
918        debug!("may_linger_files: {:?}", may_linger_files);
919
920        let all_may_linger_files = may_linger_files.values().flatten().collect::<HashSet<_>>();
921
922        // known files(tracked in removed files field) that are eligible for removal
923        // (passed lingering time)
924        let eligible_for_removal = recently_removed_files
925            .values()
926            .flatten()
927            .collect::<HashSet<_>>();
928
929        // When full_file_listing is false, skip expensive list operations and only delete
930        // files that are tracked in recently_removed_files
931        if !self.full_file_listing {
932            // Only delete files that:
933            // 1. Are in recently_removed_files (tracked in manifest)
934            // 2. Are not in use(in manifest or tmp ref)
935            // 3. Have passed the lingering time
936            let files_to_delete: Vec<RemovedFile> = eligible_for_removal
937                .iter()
938                .filter(|file_id| {
939                    let in_use = match file_id {
940                        RemovedFile::File(file_id, index_version) => {
941                            in_manifest.get(file_id) == Some(index_version)
942                                || in_tmp_ref.contains(&(*file_id, *index_version))
943                        }
944                        RemovedFile::Index(file_id, index_version) => {
945                            in_manifest.get(file_id) == Some(&Some(*index_version))
946                                || in_tmp_ref.contains(&(*file_id, Some(*index_version)))
947                        }
948                    };
949                    !in_use
950                })
951                .map(|&f| f.clone())
952                .collect();
953
954            info!(
955                "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
956                region_id,
957                files_to_delete.len()
958            );
959
960            return Ok(files_to_delete);
961        }
962
963        // Full file listing mode: get the full list of files from object store
964
965        // Step 3: Filter files to determine which ones can be deleted
966        let all_unused_files_ready_for_delete = self.filter_deletable_files(
967            is_region_dropped,
968            all_entries,
969            in_manifest,
970            in_tmp_ref,
971            &all_may_linger_files,
972            &eligible_for_removal,
973            unknown_file_may_linger_until,
974        );
975
976        Ok(all_unused_files_ready_for_delete)
977    }
978}
979
980/// Generate partition prefixes based on concurrency and
981/// assume file names are evenly-distributed uuid string,
982/// to evenly distribute files across partitions.
983/// For example, if concurrency is 2, partition prefixes will be:
984/// ["8"] so it divide uuids into two partitions based on the first character.
985/// If concurrency is 32, partition prefixes will be:
986/// ["08", "10", "18", "20", "28", "30", "38" ..., "f0", "f8"]
987/// if concurrency is 1, it returns an empty vector.
988///
989fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
990    let n = concurrency.next_power_of_two();
991    if n <= 1 {
992        return vec![];
993    }
994
995    // `d` is the number of hex characters required to build the partition key.
996    // `p` is the total number of possible values for a key of length `d`.
997    // We need to find the smallest `d` such that 16^d >= n.
998    let mut d = 0;
999    let mut p: u128 = 1;
1000    while p < n as u128 {
1001        p *= 16;
1002        d += 1;
1003    }
1004
1005    let total_space = p;
1006    let step = total_space / n as u128;
1007
1008    (1..n)
1009        .map(|i| {
1010            let boundary = i as u128 * step;
1011            format!("{:0width$x}", boundary, width = d)
1012        })
1013        .collect()
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018    use super::*;
1019
1020    #[test]
1021    fn test_gen_partition_from_concurrency() {
1022        let partitions = gen_partition_from_concurrency(1);
1023        assert!(partitions.is_empty());
1024
1025        let partitions = gen_partition_from_concurrency(2);
1026        assert_eq!(partitions, vec!["8"]);
1027
1028        let partitions = gen_partition_from_concurrency(3);
1029        assert_eq!(partitions, vec!["4", "8", "c"]);
1030
1031        let partitions = gen_partition_from_concurrency(4);
1032        assert_eq!(partitions, vec!["4", "8", "c"]);
1033
1034        let partitions = gen_partition_from_concurrency(8);
1035        assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
1036
1037        let partitions = gen_partition_from_concurrency(16);
1038        assert_eq!(
1039            partitions,
1040            vec![
1041                "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
1042            ]
1043        );
1044
1045        let partitions = gen_partition_from_concurrency(32);
1046        assert_eq!(
1047            partitions,
1048            [
1049                "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
1050                "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
1051                "e8", "f0", "f8",
1052            ]
1053        );
1054    }
1055}