mito2/
gc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! GC worker which periodically checks and removes unused/obsolete  SST files.
16//!
17//! `expel time`: the time when the file is considered as removed, as in removed from the manifest.
18//! `lingering time`: the time duration before deleting files after they are removed from manifest.
19//! `delta manifest`: the manifest files after the last checkpoint that contains the changes to the manifest.
20//! `delete time`: the time when the file is actually deleted from the object store.
21//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint.
22//!
23
24use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
25use std::time::Duration;
26
27use common_telemetry::{error, info, warn};
28use common_time::Timestamp;
29use object_store::{Entry, Lister};
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt as _, ensure};
32use store_api::storage::{FileId, RegionId};
33use tokio_stream::StreamExt;
34
35use crate::access_layer::AccessLayerRef;
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39    DurationOutOfRangeSnafu, EmptyRegionDirSnafu, JoinSnafu, OpenDalSnafu, RegionNotFoundSnafu,
40    Result, UnexpectedSnafu,
41};
42use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
43use crate::manifest::storage::manifest_compress_type;
44use crate::metrics::GC_FILE_CNT;
45use crate::region::opener::new_manifest_dir;
46use crate::sst::file::delete_files;
47use crate::sst::file_ref::TableFileRefsManifest;
48use crate::sst::location::{self, region_dir_from_table_dir};
49
50#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
51pub struct GcReport {
52    /// deleted files per region
53    pub deleted_files: HashMap<RegionId, Vec<FileId>>,
54    /// Regions that need retry in next gc round, usually because their tmp ref files are outdated
55    pub need_retry_regions: HashSet<RegionId>,
56}
57
58#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
59pub struct FileGcOption {
60    /// Lingering time before deleting files.
61    /// Should be long enough to allow long running queries to finish.
62    ///
63    /// TODO(discord9): long running queries should actively write tmp manifest files
64    /// to prevent deletion of files they are using.
65    #[serde(with = "humantime_serde")]
66    pub lingering_time: Duration,
67    /// Lingering time before deleting unknown files(files with undetermine expel time).
68    /// expel time is the time when the file is considered as removed, as in removed from the manifest.
69    /// This should only occur rarely, as manifest keep tracks in `removed_files` field
70    /// unless something goes wrong.
71    #[serde(with = "humantime_serde")]
72    pub unknown_file_lingering_time: Duration,
73    /// Maximum concurrent list operations per GC job.
74    /// This is used to limit the number of concurrent listing operations and speed up listing.
75    pub max_concurrent_lister_per_gc_job: usize,
76}
77
78impl Default for FileGcOption {
79    fn default() -> Self {
80        Self {
81            // expect long running queries to be finished within a reasonable time
82            lingering_time: Duration::from_secs(60 * 5),
83            // 6 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
84            unknown_file_lingering_time: Duration::from_secs(60 * 60 * 6),
85            max_concurrent_lister_per_gc_job: 32,
86        }
87    }
88}
89
90pub struct LocalGcWorker {
91    pub(crate) access_layer: AccessLayerRef,
92    pub(crate) cache_manager: Option<CacheManagerRef>,
93    pub(crate) manifest_mgrs: HashMap<RegionId, RegionManifestManager>,
94    /// Lingering time before deleting files.
95    pub(crate) opt: FileGcOption,
96    pub(crate) manifest_open_config: ManifestOpenConfig,
97    /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
98    ///
99    /// Also contains manifest versions of regions when the tmp ref files are generated.
100    /// Used to determine whether the tmp ref files are outdated.
101    pub(crate) file_ref_manifest: TableFileRefsManifest,
102}
103
104pub struct ManifestOpenConfig {
105    pub compress_manifest: bool,
106    pub manifest_checkpoint_distance: u64,
107    pub experimental_manifest_keep_removed_file_count: usize,
108    pub experimental_manifest_keep_removed_file_ttl: Duration,
109}
110
111impl From<MitoConfig> for ManifestOpenConfig {
112    fn from(mito_config: MitoConfig) -> Self {
113        Self {
114            compress_manifest: mito_config.compress_manifest,
115            manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
116            experimental_manifest_keep_removed_file_count: mito_config
117                .experimental_manifest_keep_removed_file_count,
118            experimental_manifest_keep_removed_file_ttl: mito_config
119                .experimental_manifest_keep_removed_file_ttl,
120        }
121    }
122}
123
124impl LocalGcWorker {
125    /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC.
126    /// The regions are specified by their `RegionId` and should all belong to the same table.
127    ///
128    pub async fn try_new(
129        access_layer: AccessLayerRef,
130        cache_manager: Option<CacheManagerRef>,
131        regions_to_gc: BTreeSet<RegionId>,
132        opt: FileGcOption,
133        manifest_open_config: ManifestOpenConfig,
134        file_ref_manifest: TableFileRefsManifest,
135    ) -> Result<Self> {
136        let table_id = regions_to_gc
137            .first()
138            .context(UnexpectedSnafu {
139                reason: "Expect at least one region, found none",
140            })?
141            .table_id();
142        let mut zelf = Self {
143            access_layer,
144            cache_manager,
145            manifest_mgrs: HashMap::new(),
146            opt,
147            manifest_open_config,
148            file_ref_manifest,
149        };
150
151        // dedup just in case
152        for region_id in regions_to_gc {
153            ensure!(
154                region_id.table_id() == table_id,
155                UnexpectedSnafu {
156                    reason: format!(
157                        "All regions should belong to the same table, found region {} and table {}",
158                        region_id, table_id
159                    ),
160                }
161            );
162            let mgr = zelf.open_mgr_for(region_id).await?;
163            zelf.manifest_mgrs.insert(region_id, mgr);
164        }
165
166        Ok(zelf)
167    }
168
169    /// Get tmp ref files for all current regions
170    ///
171    /// Outdated regions are added to `outdated_regions` set
172    pub async fn read_tmp_ref_files(
173        &self,
174        outdated_regions: &mut HashSet<RegionId>,
175    ) -> Result<HashMap<RegionId, HashSet<FileId>>> {
176        for (region_id, region_mgr) in &self.manifest_mgrs {
177            let current_version = region_mgr.manifest().manifest_version;
178            if &current_version
179                > self
180                    .file_ref_manifest
181                    .manifest_version
182                    .get(region_id)
183                    .with_context(|| UnexpectedSnafu {
184                        reason: format!(
185                            "Region {} not found in tmp ref manifest version map",
186                            region_id
187                        ),
188                    })?
189            {
190                outdated_regions.insert(*region_id);
191            }
192        }
193        // TODO(discord9): verify manifest version before reading tmp ref files
194
195        let mut tmp_ref_files = HashMap::new();
196        for file_ref in &self.file_ref_manifest.file_refs {
197            if outdated_regions.contains(&file_ref.region_id) {
198                // skip outdated regions
199                continue;
200            }
201            tmp_ref_files
202                .entry(file_ref.region_id)
203                .or_insert_with(HashSet::new)
204                .insert(file_ref.file_id);
205        }
206
207        Ok(tmp_ref_files)
208    }
209
210    /// Run the GC worker in serial mode,
211    /// considering list files could be slow and run multiple regions in parallel
212    /// may cause too many concurrent listing operations.
213    ///
214    /// TODO(discord9): consider instead running in parallel mode
215    pub async fn run(self) -> Result<GcReport> {
216        info!("LocalGcWorker started");
217        let now = std::time::Instant::now();
218
219        let mut outdated_regions = HashSet::new();
220        let mut deleted_files = HashMap::new();
221        let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?;
222        for region_id in self.manifest_mgrs.keys() {
223            info!("Doing gc for region {}", region_id);
224            let tmp_ref_files = tmp_ref_files
225                .get(region_id)
226                .cloned()
227                .unwrap_or_else(HashSet::new);
228            let files = self.do_region_gc(*region_id, &tmp_ref_files).await?;
229            deleted_files.insert(*region_id, files);
230            info!("Gc for region {} finished", region_id);
231        }
232        info!(
233            "LocalGcWorker finished after {} secs.",
234            now.elapsed().as_secs()
235        );
236        let report = GcReport {
237            deleted_files,
238            need_retry_regions: outdated_regions.into_iter().collect(),
239        };
240        Ok(report)
241    }
242}
243
244impl LocalGcWorker {
245    /// concurrency of listing files per region.
246    /// This is used to limit the number of concurrent listing operations and speed up listing
247    pub const CONCURRENCY_LIST_PER_FILES: usize = 512;
248
249    /// Perform GC for the region.
250    /// 1. Get all the removed files in delta manifest files and their expel times
251    /// 2. List all files in the region dir concurrently
252    /// 3. Filter out the files that are still in use or may still be kept for a while
253    /// 4. Delete the unused files
254    ///
255    /// Note that the files that are still in use or may still be kept for a while are not deleted
256    /// to avoid deleting files that are still needed.
257    pub async fn do_region_gc(
258        &self,
259        region_id: RegionId,
260        tmp_ref_files: &HashSet<FileId>,
261    ) -> Result<Vec<FileId>> {
262        info!("Doing gc for region {}", region_id);
263        let manifest = self
264            .manifest_mgrs
265            .get(&region_id)
266            .context(RegionNotFoundSnafu { region_id })?
267            .manifest();
268        let region_id = manifest.metadata.region_id;
269        let current_files = &manifest.files;
270
271        let recently_removed_files = self.get_removed_files_expel_times(region_id).await?;
272
273        if recently_removed_files.is_empty() {
274            // no files to remove, skip
275            info!("No recently removed files to gc for region {}", region_id);
276        }
277
278        info!(
279            "Found {} recently removed files sets for region {}",
280            recently_removed_files.len(),
281            region_id
282        );
283
284        let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
285            .max(1)
286            .min(self.opt.max_concurrent_lister_per_gc_job);
287
288        let in_used = current_files
289            .keys()
290            .cloned()
291            .chain(tmp_ref_files.clone().into_iter())
292            .collect();
293
294        let true_tmp_ref_files = tmp_ref_files
295            .iter()
296            .filter(|f| !current_files.contains_key(f))
297            .collect::<HashSet<_>>();
298
299        info!("True tmp ref files: {:?}", true_tmp_ref_files);
300
301        let unused_files = self
302            .list_to_be_deleted_files(region_id, in_used, recently_removed_files, concurrency)
303            .await?;
304
305        let unused_len = unused_files.len();
306
307        info!(
308            "Found {} unused files to delete for region {}",
309            unused_len, region_id
310        );
311
312        self.delete_files(region_id, &unused_files).await?;
313
314        info!(
315            "Successfully deleted {} unused files for region {}",
316            unused_len, region_id
317        );
318
319        Ok(unused_files)
320    }
321
322    async fn delete_files(&self, region_id: RegionId, file_ids: &[FileId]) -> Result<()> {
323        delete_files(
324            region_id,
325            file_ids,
326            true,
327            &self.access_layer,
328            &self.cache_manager,
329        )
330        .await?;
331
332        GC_FILE_CNT.add(file_ids.len() as i64);
333
334        Ok(())
335    }
336
337    /// Get the manifest manager for the region.
338    async fn open_mgr_for(&self, region_id: RegionId) -> Result<RegionManifestManager> {
339        let table_dir = self.access_layer.table_dir();
340        let path_type = self.access_layer.path_type();
341        let mito_config = &self.manifest_open_config;
342
343        let region_manifest_options = RegionManifestOptions {
344            manifest_dir: new_manifest_dir(&region_dir_from_table_dir(
345                table_dir, region_id, path_type,
346            )),
347            object_store: self.access_layer.object_store().clone(),
348            compress_type: manifest_compress_type(mito_config.compress_manifest),
349            checkpoint_distance: mito_config.manifest_checkpoint_distance,
350            remove_file_options: RemoveFileOptions {
351                keep_count: mito_config.experimental_manifest_keep_removed_file_count,
352                keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
353            },
354        };
355
356        RegionManifestManager::open(
357            region_manifest_options,
358            Default::default(),
359            Default::default(),
360        )
361        .await?
362        .context(EmptyRegionDirSnafu {
363            region_id,
364            region_dir: &region_dir_from_table_dir(table_dir, region_id, path_type),
365        })
366    }
367
368    /// Get all the removed files in delta manifest files and their expel times.
369    /// The expel time is the time when the file is considered as removed.
370    /// Which is the last modified time of delta manifest which contains the remove action.
371    ///
372    pub async fn get_removed_files_expel_times(
373        &self,
374        region_id: RegionId,
375    ) -> Result<BTreeMap<Timestamp, HashSet<FileId>>> {
376        let region_manifest = self
377            .manifest_mgrs
378            .get(&region_id)
379            .context(RegionNotFoundSnafu { region_id })?
380            .manifest();
381
382        let mut ret = BTreeMap::new();
383        for files in &region_manifest.removed_files.removed_files {
384            let expel_time = Timestamp::new_millisecond(files.removed_at);
385            let set = ret.entry(expel_time).or_insert_with(HashSet::new);
386            set.extend(files.file_ids.iter().cloned());
387        }
388
389        Ok(ret)
390    }
391
392    /// Create partitioned listers for concurrent file listing based on concurrency level.
393    /// Returns a vector of (lister, end_boundary) pairs for parallel processing.
394    async fn partition_region_files(
395        &self,
396        region_id: RegionId,
397        concurrency: usize,
398    ) -> Result<Vec<(Lister, Option<String>)>> {
399        let region_dir = self.access_layer.build_region_dir(region_id);
400
401        let partitions = gen_partition_from_concurrency(concurrency);
402        let bounds = vec![None]
403            .into_iter()
404            .chain(partitions.iter().map(|p| Some(p.clone())))
405            .chain(vec![None])
406            .collect::<Vec<_>>();
407
408        let mut listers = vec![];
409        for part in bounds.windows(2) {
410            let start = part[0].clone();
411            let end = part[1].clone();
412            let mut lister = self.access_layer.object_store().lister_with(&region_dir);
413            if let Some(s) = start {
414                lister = lister.start_after(&s);
415            }
416
417            let lister = lister.await.context(OpenDalSnafu)?;
418            listers.push((lister, end));
419        }
420
421        Ok(listers)
422    }
423
424    /// Concurrently list all files in the region directory using the provided listers.
425    /// Returns a vector of all file entries found across all partitions.
426    async fn list_region_files_concurrent(
427        &self,
428        listers: Vec<(Lister, Option<String>)>,
429    ) -> Result<Vec<Entry>> {
430        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
431        let mut handles = vec![];
432
433        for (lister, end) in listers {
434            let tx = tx.clone();
435            let handle = tokio::spawn(async move {
436                let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
437                    Ok(e) => {
438                        if let Some(end) = &end {
439                            // reach end, stop listing
440                            e.name() < end.as_str()
441                        } else {
442                            // no end, take all entries
443                            true
444                        }
445                    }
446                    // entry went wrong, log and skip it
447                    Err(err) => {
448                        warn!("Failed to list entry: {}", err);
449                        true
450                    }
451                });
452                let stream = stream
453                    .filter(|e| {
454                        if let Ok(e) = &e {
455                            // notice that we only care about files, skip dirs
456                            e.metadata().is_file()
457                        } else {
458                            // error entry, take for further logging
459                            true
460                        }
461                    })
462                    .collect::<Vec<_>>()
463                    .await;
464                // ordering of files doesn't matter here, so we can send them directly
465                tx.send(stream).await.expect("Failed to send entries");
466            });
467
468            handles.push(handle);
469        }
470
471        // Wait for all listers to finish
472        for handle in handles {
473            handle.await.context(JoinSnafu)?;
474        }
475
476        drop(tx); // Close the channel to stop receiving
477
478        // Collect all entries from the channel
479        let mut all_entries = vec![];
480        while let Some(stream) = rx.recv().await {
481            all_entries.extend(stream.into_iter().filter_map(Result::ok));
482        }
483
484        Ok(all_entries)
485    }
486
487    /// Filter files to determine which ones can be deleted based on usage status and lingering time.
488    /// Returns a vector of file IDs that are safe to delete.
489    fn filter_deletable_files(
490        &self,
491        entries: Vec<Entry>,
492        in_use_filenames: &HashSet<&FileId>,
493        may_linger_filenames: &HashSet<&FileId>,
494        all_files_appear_in_delta_manifests: &HashSet<&FileId>,
495        unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
496    ) -> (Vec<FileId>, HashSet<FileId>) {
497        let mut all_unused_files_ready_for_delete = vec![];
498        let mut all_in_exist_linger_files = HashSet::new();
499
500        for entry in entries {
501            let file_id = match location::parse_file_id_from_path(entry.name()) {
502                Ok(file_id) => file_id,
503                Err(err) => {
504                    error!(err; "Failed to parse file id from path: {}", entry.name());
505                    // if we can't parse the file id, it means it's not a sst or index file
506                    // shouldn't delete it because we don't know what it is
507                    continue;
508                }
509            };
510
511            if may_linger_filenames.contains(&file_id) {
512                all_in_exist_linger_files.insert(file_id);
513            }
514
515            let should_delete = !in_use_filenames.contains(&file_id)
516                && !may_linger_filenames.contains(&file_id)
517                && {
518                    if !all_files_appear_in_delta_manifests.contains(&file_id) {
519                        // if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while
520                        // using it's last modified time
521                        // notice unknown files use a different lingering time
522                        entry
523                            .metadata()
524                            .last_modified()
525                            .map(|t| t < unknown_file_may_linger_until)
526                            .unwrap_or(false)
527                    } else {
528                        // if the file did appear in manifest delta(and passes previous predicate), we can delete it immediately
529                        true
530                    }
531                };
532
533            if should_delete {
534                all_unused_files_ready_for_delete.push(file_id);
535            }
536        }
537
538        (all_unused_files_ready_for_delete, all_in_exist_linger_files)
539    }
540
541    /// Concurrently list unused files in the region dir
542    /// because there may be a lot of files in the region dir
543    /// and listing them may take a long time.
544    pub async fn list_to_be_deleted_files(
545        &self,
546        region_id: RegionId,
547        in_used: HashSet<FileId>,
548        recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
549        concurrency: usize,
550    ) -> Result<Vec<FileId>> {
551        let now = chrono::Utc::now();
552        let may_linger_until = now
553            - chrono::Duration::from_std(self.opt.lingering_time).with_context(|_| {
554                DurationOutOfRangeSnafu {
555                    input: self.opt.lingering_time,
556                }
557            })?;
558
559        let unknown_file_may_linger_until = now
560            - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
561                |_| DurationOutOfRangeSnafu {
562                    input: self.opt.unknown_file_lingering_time,
563                },
564            )?;
565
566        // files that may linger, which means they are not in use but may still be kept for a while
567        let threshold = Timestamp::new_millisecond(may_linger_until.timestamp_millis());
568        let mut recently_removed_files = recently_removed_files;
569        let may_linger_files = recently_removed_files.split_off(&threshold);
570        let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
571
572        let all_files_appear_in_delta_manifests = recently_removed_files
573            .values()
574            .flatten()
575            .collect::<HashSet<_>>();
576
577        // in use filenames, include sst and index files
578        let in_use_filenames = in_used.iter().collect::<HashSet<_>>();
579
580        // Step 1: Create partitioned listers for concurrent processing
581        let listers = self.partition_region_files(region_id, concurrency).await?;
582
583        // Step 2: Concurrently list all files in the region directory
584        let all_entries = self.list_region_files_concurrent(listers).await?;
585
586        // Step 3: Filter files to determine which ones can be deleted
587        let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
588            .filter_deletable_files(
589                all_entries,
590                &in_use_filenames,
591                &may_linger_filenames,
592                &all_files_appear_in_delta_manifests,
593                unknown_file_may_linger_until,
594            );
595
596        info!("All in exist linger files: {:?}", all_in_exist_linger_files);
597
598        Ok(all_unused_files_ready_for_delete)
599    }
600}
601
602/// Generate partition prefixes based on concurrency and
603/// assume file names are evenly-distributed uuid string,
604/// to evenly distribute files across partitions.
605/// For example, if concurrency is 2, partition prefixes will be:
606/// ["8"] so it divide uuids into two partitions based on the first character.
607/// If concurrency is 32, partition prefixes will be:
608/// ["08", "10", "18", "20", "28", "30", "38" ..., "f0", "f8"]
609/// if concurrency is 1, it returns an empty vector.
610///
611fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
612    let n = concurrency.next_power_of_two();
613    if n <= 1 {
614        return vec![];
615    }
616
617    // `d` is the number of hex characters required to build the partition key.
618    // `p` is the total number of possible values for a key of length `d`.
619    // We need to find the smallest `d` such that 16^d >= n.
620    let mut d = 0;
621    let mut p: u128 = 1;
622    while p < n as u128 {
623        p *= 16;
624        d += 1;
625    }
626
627    let total_space = p;
628    let step = total_space / n as u128;
629
630    (1..n)
631        .map(|i| {
632            let boundary = i as u128 * step;
633            format!("{:0width$x}", boundary, width = d)
634        })
635        .collect()
636}
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641
642    #[test]
643    fn test_gen_partition_from_concurrency() {
644        let partitions = gen_partition_from_concurrency(1);
645        assert!(partitions.is_empty());
646
647        let partitions = gen_partition_from_concurrency(2);
648        assert_eq!(partitions, vec!["8"]);
649
650        let partitions = gen_partition_from_concurrency(3);
651        assert_eq!(partitions, vec!["4", "8", "c"]);
652
653        let partitions = gen_partition_from_concurrency(4);
654        assert_eq!(partitions, vec!["4", "8", "c"]);
655
656        let partitions = gen_partition_from_concurrency(8);
657        assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
658
659        let partitions = gen_partition_from_concurrency(16);
660        assert_eq!(
661            partitions,
662            vec![
663                "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
664            ]
665        );
666
667        let partitions = gen_partition_from_concurrency(32);
668        assert_eq!(
669            partitions,
670            [
671                "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
672                "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
673                "e8", "f0", "f8",
674            ]
675        );
676    }
677}