mito2/
gc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! GC worker which periodically checks and removes unused/obsolete  SST files.
16//!
17//! `expel time`: the time when the file is considered as removed, as in removed from the manifest.
18//! `lingering time`: the time duration before deleting files after they are removed from manifest.
19//! `delta manifest`: the manifest files after the last checkpoint that contains the changes to the manifest.
20//! `delete time`: the time when the file is actually deleted from the object store.
21//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint.
22//!
23
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::{debug, error, info, warn};
30use common_time::Timestamp;
31use object_store::{Entry, Lister};
32use serde::{Deserialize, Serialize};
33use snafu::ResultExt as _;
34use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
35use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
36use tokio_stream::StreamExt;
37
38use crate::access_layer::AccessLayerRef;
39use crate::cache::CacheManagerRef;
40use crate::config::MitoConfig;
41use crate::error::{
42    DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu,
43};
44use crate::manifest::action::RegionManifest;
45use crate::metrics::GC_DELETE_FILE_CNT;
46use crate::region::{MitoRegionRef, RegionRoleState};
47use crate::sst::file::delete_files;
48use crate::sst::location::{self};
49
50#[cfg(test)]
51mod worker_test;
52
53/// Limit the amount of concurrent GC jobs on the datanode
54pub struct GcLimiter {
55    pub gc_job_limit: Arc<tokio::sync::Semaphore>,
56    gc_concurrency: usize,
57}
58
59pub type GcLimiterRef = Arc<GcLimiter>;
60
61impl GcLimiter {
62    pub fn new(gc_concurrency: usize) -> Self {
63        Self {
64            gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
65            gc_concurrency,
66        }
67    }
68
69    pub fn running_gc_tasks(&self) -> u32 {
70        (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
71    }
72
73    pub fn gc_concurrency(&self) -> u32 {
74        self.gc_concurrency as u32
75    }
76
77    pub fn gc_stat(&self) -> GcStat {
78        GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
79    }
80
81    /// Try to acquire a permit for a GC job.
82    ///
83    /// If no permit is available, returns an `TooManyGcJobs` error.
84    pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
85        self.gc_job_limit
86            .clone()
87            .try_acquire_owned()
88            .map_err(|e| match e {
89                TryAcquireError::Closed => UnexpectedSnafu {
90                    reason: format!("Failed to acquire gc permit: {e}"),
91                }
92                .build(),
93                TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
94            })
95    }
96}
97
98#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
99#[serde(default)]
100pub struct GcConfig {
101    /// Whether GC is enabled.
102    pub enable: bool,
103    /// Lingering time before deleting files.
104    /// Should be long enough to allow long running queries to finish.
105    /// If set to None, then unused files will be deleted immediately.
106    ///
107    /// TODO(discord9): long running queries should actively write tmp manifest files
108    /// to prevent deletion of files they are using.
109    #[serde(with = "humantime_serde")]
110    pub lingering_time: Option<Duration>,
111    /// Lingering time before deleting unknown files(files with undetermine expel time).
112    /// expel time is the time when the file is considered as removed, as in removed from the manifest.
113    /// This should only occur rarely, as manifest keep tracks in `removed_files` field
114    /// unless something goes wrong.
115    #[serde(with = "humantime_serde")]
116    pub unknown_file_lingering_time: Duration,
117    /// Maximum concurrent list operations per GC job.
118    /// This is used to limit the number of concurrent listing operations and speed up listing.
119    pub max_concurrent_lister_per_gc_job: usize,
120    /// Maximum concurrent GC jobs.
121    /// This is used to limit the number of concurrent GC jobs running on the datanode
122    /// to prevent too many concurrent GC jobs from overwhelming the datanode.
123    pub max_concurrent_gc_job: usize,
124}
125
126impl Default for GcConfig {
127    fn default() -> Self {
128        Self {
129            enable: false,
130            // expect long running queries to be finished(or at least be able to notify it's using a deleted file) within a reasonable time
131            lingering_time: Some(Duration::from_secs(60)),
132            // 1 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
133            unknown_file_lingering_time: Duration::from_secs(60 * 60),
134            max_concurrent_lister_per_gc_job: 32,
135            max_concurrent_gc_job: 4,
136        }
137    }
138}
139
140pub struct LocalGcWorker {
141    pub(crate) access_layer: AccessLayerRef,
142    pub(crate) cache_manager: Option<CacheManagerRef>,
143    pub(crate) regions: BTreeMap<RegionId, MitoRegionRef>,
144    /// Lingering time before deleting files.
145    pub(crate) opt: GcConfig,
146    /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
147    ///
148    /// Also contains manifest versions of regions when the tmp ref files are generated.
149    /// Used to determine whether the tmp ref files are outdated.
150    pub(crate) file_ref_manifest: FileRefsManifest,
151    _permit: OwnedSemaphorePermit,
152    /// Whether to perform full file listing during GC.
153    /// When set to false, GC will only delete files that are tracked in the manifest's removed_files,
154    /// which can significantly improve performance by avoiding expensive list operations.
155    /// When set to true, GC will perform a full listing to find and delete orphan files
156    /// (files not tracked in the manifest).
157    ///
158    /// Set to false for regular GC operations to optimize performance.
159    /// Set to true periodically or when you need to clean up orphan files.
160    pub full_file_listing: bool,
161}
162
163pub struct ManifestOpenConfig {
164    pub compress_manifest: bool,
165    pub manifest_checkpoint_distance: u64,
166    pub experimental_manifest_keep_removed_file_count: usize,
167    pub experimental_manifest_keep_removed_file_ttl: Duration,
168}
169
170impl From<MitoConfig> for ManifestOpenConfig {
171    fn from(mito_config: MitoConfig) -> Self {
172        Self {
173            compress_manifest: mito_config.compress_manifest,
174            manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
175            experimental_manifest_keep_removed_file_count: mito_config
176                .experimental_manifest_keep_removed_file_count,
177            experimental_manifest_keep_removed_file_ttl: mito_config
178                .experimental_manifest_keep_removed_file_ttl,
179        }
180    }
181}
182
183impl LocalGcWorker {
184    /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC.
185    /// The regions are specified by their `RegionId` and should all belong to the same table.
186    ///
187    #[allow(clippy::too_many_arguments)]
188    pub async fn try_new(
189        access_layer: AccessLayerRef,
190        cache_manager: Option<CacheManagerRef>,
191        regions_to_gc: BTreeMap<RegionId, MitoRegionRef>,
192        opt: GcConfig,
193        file_ref_manifest: FileRefsManifest,
194        limiter: &GcLimiterRef,
195        full_file_listing: bool,
196    ) -> Result<Self> {
197        let permit = limiter.permit()?;
198
199        Ok(Self {
200            access_layer,
201            cache_manager,
202            regions: regions_to_gc,
203            opt,
204            file_ref_manifest,
205            _permit: permit,
206            full_file_listing,
207        })
208    }
209
210    /// Get tmp ref files for all current regions
211    ///
212    /// Outdated regions are added to `outdated_regions` set, which means their manifest version in
213    /// self.file_ref_manifest is older than the current manifest version on datanode.
214    /// so they need to retry GC later by metasrv with updated tmp ref files.
215    pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileId>>> {
216        let mut tmp_ref_files = HashMap::new();
217        for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
218            tmp_ref_files
219                .entry(*region_id)
220                .or_insert_with(HashSet::new)
221                .extend(file_refs.clone());
222        }
223
224        Ok(tmp_ref_files)
225    }
226
227    /// Run the GC worker in serial mode,
228    /// considering list files could be slow and run multiple regions in parallel
229    /// may cause too many concurrent listing operations.
230    ///
231    /// TODO(discord9): consider instead running in parallel mode
232    pub async fn run(self) -> Result<GcReport> {
233        info!("LocalGcWorker started");
234        let now = std::time::Instant::now();
235
236        let mut deleted_files = HashMap::new();
237        let tmp_ref_files = self.read_tmp_ref_files().await?;
238        for (region_id, region) in &self.regions {
239            let per_region_time = std::time::Instant::now();
240            if region.manifest_ctx.current_state() == RegionRoleState::Follower {
241                return UnexpectedSnafu {
242                    reason: format!(
243                        "Region {} is in Follower state, should not run GC on follower regions",
244                        region_id
245                    ),
246                }
247                .fail();
248            }
249            let tmp_ref_files = tmp_ref_files
250                .get(region_id)
251                .cloned()
252                .unwrap_or_else(HashSet::new);
253            let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?;
254            deleted_files.insert(*region_id, files);
255            debug!(
256                "GC for region {} took {} secs.",
257                region_id,
258                per_region_time.elapsed().as_secs_f32()
259            );
260        }
261        info!(
262            "LocalGcWorker finished after {} secs.",
263            now.elapsed().as_secs_f32()
264        );
265        let report = GcReport {
266            deleted_files,
267            need_retry_regions: HashSet::new(),
268        };
269        Ok(report)
270    }
271}
272
273impl LocalGcWorker {
274    /// concurrency of listing files per region.
275    /// This is used to limit the number of concurrent listing operations and speed up listing
276    pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
277
278    /// Perform GC for the region.
279    /// 1. Get all the removed files in delta manifest files and their expel times
280    /// 2. List all files in the region dir concurrently
281    /// 3. Filter out the files that are still in use or may still be kept for a while
282    /// 4. Delete the unused files
283    ///
284    /// Note that the files that are still in use or may still be kept for a while are not deleted
285    /// to avoid deleting files that are still needed.
286    pub async fn do_region_gc(
287        &self,
288        region: MitoRegionRef,
289        tmp_ref_files: &HashSet<FileId>,
290    ) -> Result<Vec<FileId>> {
291        let region_id = region.region_id();
292
293        debug!("Doing gc for region {}", region_id);
294        let manifest = region.manifest_ctx.manifest().await;
295        let region_id = manifest.metadata.region_id;
296        let current_files = &manifest.files;
297
298        let recently_removed_files = self.get_removed_files_expel_times(&manifest).await?;
299
300        if recently_removed_files.is_empty() {
301            // no files to remove, skip
302            debug!("No recently removed files to gc for region {}", region_id);
303        }
304
305        let removed_file_cnt = recently_removed_files
306            .values()
307            .map(|s| s.len())
308            .sum::<usize>();
309
310        let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
311            .max(1)
312            .min(self.opt.max_concurrent_lister_per_gc_job);
313
314        let in_used: HashSet<FileId> = current_files
315            .keys()
316            .cloned()
317            .chain(tmp_ref_files.clone().into_iter())
318            .collect();
319
320        let unused_files = self
321            .list_to_be_deleted_files(region_id, &in_used, recently_removed_files, concurrency)
322            .await?;
323
324        let unused_file_cnt = unused_files.len();
325
326        debug!(
327            "gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, In-used files: {}, recently removed files: {}, Unused files to delete: {} ",
328            current_files.len(),
329            tmp_ref_files.len(),
330            in_used.len(),
331            removed_file_cnt,
332            unused_files.len()
333        );
334
335        // TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once
336        // index file design is stable
337        let file_pairs: Vec<(FileId, FileId)> = unused_files
338            .iter()
339            .map(|file_id| (*file_id, *file_id))
340            .collect();
341
342        debug!(
343            "Found {} unused index files to delete for region {}",
344            file_pairs.len(),
345            region_id
346        );
347
348        self.delete_files(region_id, &file_pairs).await?;
349
350        debug!(
351            "Successfully deleted {} unused files for region {}",
352            unused_file_cnt, region_id
353        );
354        // TODO(discord9): update region manifest about deleted files
355        self.update_manifest_removed_files(&region, unused_files.clone())
356            .await?;
357
358        Ok(unused_files)
359    }
360
361    async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> {
362        delete_files(
363            region_id,
364            file_ids,
365            true,
366            &self.access_layer,
367            &self.cache_manager,
368        )
369        .await?;
370
371        // FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now
372        GC_DELETE_FILE_CNT.add(file_ids.len() as i64);
373
374        Ok(())
375    }
376
377    /// Update region manifest for clear the actually deleted files
378    async fn update_manifest_removed_files(
379        &self,
380        region: &MitoRegionRef,
381        deleted_files: Vec<FileId>,
382    ) -> Result<()> {
383        let deleted_file_cnt = deleted_files.len();
384        debug!(
385            "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
386            region.region_id()
387        );
388
389        let mut manager = region.manifest_ctx.manifest_manager.write().await;
390        let cnt = deleted_files.len();
391        manager.clear_deleted_files(deleted_files);
392        debug!(
393            "Updated region_id={} region manifest to clear {cnt} deleted files",
394            region.region_id(),
395        );
396
397        Ok(())
398    }
399
400    /// Get all the removed files in delta manifest files and their expel times.
401    /// The expel time is the time when the file is considered as removed.
402    /// Which is the last modified time of delta manifest which contains the remove action.
403    ///
404    pub async fn get_removed_files_expel_times(
405        &self,
406        region_manifest: &Arc<RegionManifest>,
407    ) -> Result<BTreeMap<Timestamp, HashSet<FileId>>> {
408        let mut ret = BTreeMap::new();
409        for files in &region_manifest.removed_files.removed_files {
410            let expel_time = Timestamp::new_millisecond(files.removed_at);
411            let set = ret.entry(expel_time).or_insert_with(HashSet::new);
412            set.extend(files.file_ids.iter().cloned());
413        }
414
415        Ok(ret)
416    }
417
418    /// Create partitioned listers for concurrent file listing based on concurrency level.
419    /// Returns a vector of (lister, end_boundary) pairs for parallel processing.
420    async fn partition_region_files(
421        &self,
422        region_id: RegionId,
423        concurrency: usize,
424    ) -> Result<Vec<(Lister, Option<String>)>> {
425        let region_dir = self.access_layer.build_region_dir(region_id);
426
427        let partitions = gen_partition_from_concurrency(concurrency);
428        let bounds = vec![None]
429            .into_iter()
430            .chain(partitions.iter().map(|p| Some(p.clone())))
431            .chain(vec![None])
432            .collect::<Vec<_>>();
433
434        let mut listers = vec![];
435        for part in bounds.windows(2) {
436            let start = part[0].clone();
437            let end = part[1].clone();
438            let mut lister = self.access_layer.object_store().lister_with(&region_dir);
439            if let Some(s) = start {
440                lister = lister.start_after(&s);
441            }
442
443            let lister = lister.await.context(OpenDalSnafu)?;
444            listers.push((lister, end));
445        }
446
447        Ok(listers)
448    }
449
450    /// Concurrently list all files in the region directory using the provided listers.
451    /// Returns a vector of all file entries found across all partitions.
452    async fn list_region_files_concurrent(
453        &self,
454        listers: Vec<(Lister, Option<String>)>,
455    ) -> Result<Vec<Entry>> {
456        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
457        let mut handles = vec![];
458
459        for (lister, end) in listers {
460            let tx = tx.clone();
461            let handle = tokio::spawn(async move {
462                let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
463                    Ok(e) => {
464                        if let Some(end) = &end {
465                            // reach end, stop listing
466                            e.name() < end.as_str()
467                        } else {
468                            // no end, take all entries
469                            true
470                        }
471                    }
472                    // entry went wrong, log and skip it
473                    Err(err) => {
474                        warn!("Failed to list entry: {}", err);
475                        true
476                    }
477                });
478                let stream = stream
479                    .filter(|e| {
480                        if let Ok(e) = &e {
481                            // notice that we only care about files, skip dirs
482                            e.metadata().is_file()
483                        } else {
484                            // error entry, take for further logging
485                            true
486                        }
487                    })
488                    .collect::<Vec<_>>()
489                    .await;
490                // ordering of files doesn't matter here, so we can send them directly
491                tx.send(stream).await.expect("Failed to send entries");
492            });
493
494            handles.push(handle);
495        }
496
497        // Wait for all listers to finish
498        for handle in handles {
499            handle.await.context(JoinSnafu)?;
500        }
501
502        drop(tx); // Close the channel to stop receiving
503
504        // Collect all entries from the channel
505        let mut all_entries = vec![];
506        while let Some(stream) = rx.recv().await {
507            all_entries.extend(stream.into_iter().filter_map(Result::ok));
508        }
509
510        Ok(all_entries)
511    }
512
513    /// Filter files to determine which ones can be deleted based on usage status and lingering time.
514    /// Returns a vector of file IDs that are safe to delete.
515    fn filter_deletable_files(
516        &self,
517        entries: Vec<Entry>,
518        in_use_filenames: &HashSet<&FileId>,
519        may_linger_filenames: &HashSet<&FileId>,
520        eligible_for_removal: &HashSet<&FileId>,
521        unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
522    ) -> (Vec<FileId>, HashSet<FileId>) {
523        let mut all_unused_files_ready_for_delete = vec![];
524        let mut all_in_exist_linger_files = HashSet::new();
525
526        for entry in entries {
527            let file_id = match location::parse_file_id_from_path(entry.name()) {
528                Ok(file_id) => file_id,
529                Err(err) => {
530                    error!(err; "Failed to parse file id from path: {}", entry.name());
531                    // if we can't parse the file id, it means it's not a sst or index file
532                    // shouldn't delete it because we don't know what it is
533                    continue;
534                }
535            };
536
537            if may_linger_filenames.contains(&file_id) {
538                all_in_exist_linger_files.insert(file_id);
539            }
540
541            let should_delete = !in_use_filenames.contains(&file_id)
542                && !may_linger_filenames.contains(&file_id)
543                && {
544                    if !eligible_for_removal.contains(&file_id) {
545                        // if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while
546                        // using it's last modified time
547                        // notice unknown files use a different lingering time
548                        entry
549                            .metadata()
550                            .last_modified()
551                            .map(|t| t < unknown_file_may_linger_until)
552                            .unwrap_or(false)
553                    } else {
554                        // if the file did appear in manifest delta(and passes previous predicate), we can delete it immediately
555                        true
556                    }
557                };
558
559            if should_delete {
560                all_unused_files_ready_for_delete.push(file_id);
561            }
562        }
563
564        (all_unused_files_ready_for_delete, all_in_exist_linger_files)
565    }
566
567    /// Concurrently list unused files in the region dir
568    /// because there may be a lot of files in the region dir
569    /// and listing them may take a long time.
570    ///
571    /// When `full_file_listing` is false, this method will only delete files tracked in
572    /// `recently_removed_files` without performing expensive list operations, which significantly
573    /// improves performance. When `full_file_listing` is true, it performs a full listing to
574    /// find and delete orphan files.
575    pub async fn list_to_be_deleted_files(
576        &self,
577        region_id: RegionId,
578        in_used: &HashSet<FileId>,
579        recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
580        concurrency: usize,
581    ) -> Result<Vec<FileId>> {
582        let start = tokio::time::Instant::now();
583        let now = chrono::Utc::now();
584        let may_linger_until = self
585            .opt
586            .lingering_time
587            .map(|lingering_time| {
588                chrono::Duration::from_std(lingering_time)
589                    .with_context(|_| DurationOutOfRangeSnafu {
590                        input: lingering_time,
591                    })
592                    .map(|t| now - t)
593            })
594            .transpose()?;
595
596        let unknown_file_may_linger_until = now
597            - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
598                |_| DurationOutOfRangeSnafu {
599                    input: self.opt.unknown_file_lingering_time,
600                },
601            )?;
602
603        // files that may linger, which means they are not in use but may still be kept for a while
604        let threshold =
605            may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
606        let mut recently_removed_files = recently_removed_files;
607        let may_linger_files = match threshold {
608            Some(threshold) => recently_removed_files.split_off(&threshold),
609            None => BTreeMap::new(),
610        };
611        debug!("may_linger_files: {:?}", may_linger_files);
612
613        let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
614
615        let eligible_for_removal = recently_removed_files
616            .values()
617            .flatten()
618            .collect::<HashSet<_>>();
619
620        // in use filenames, include sst and index files
621        let in_use_filenames = in_used.iter().collect::<HashSet<_>>();
622
623        // When full_file_listing is false, skip expensive list operations and only delete
624        // files that are tracked in recently_removed_files
625        if !self.full_file_listing {
626            // Only delete files that:
627            // 1. Are in recently_removed_files (tracked in manifest)
628            // 2. Are not in use
629            // 3. Have passed the lingering time
630            let files_to_delete: Vec<FileId> = eligible_for_removal
631                .iter()
632                .filter(|file_id| !in_use_filenames.contains(*file_id))
633                .map(|&f| *f)
634                .collect();
635
636            info!(
637                "gc: fast mode (no full listing) cost {} secs for region {}, found {} files to delete from manifest",
638                start.elapsed().as_secs_f64(),
639                region_id,
640                files_to_delete.len()
641            );
642
643            return Ok(files_to_delete);
644        }
645
646        // Full file listing mode: perform expensive list operations to find orphan files
647        // Step 1: Create partitioned listers for concurrent processing
648        let listers = self.partition_region_files(region_id, concurrency).await?;
649        let lister_cnt = listers.len();
650
651        // Step 2: Concurrently list all files in the region directory
652        let all_entries = self.list_region_files_concurrent(listers).await?;
653
654        let cnt = all_entries.len();
655
656        // Step 3: Filter files to determine which ones can be deleted
657        let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
658            .filter_deletable_files(
659                all_entries,
660                &in_use_filenames,
661                &may_linger_filenames,
662                &eligible_for_removal,
663                unknown_file_may_linger_until,
664            );
665
666        info!(
667            "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}, found {} unused files to delete",
668            start.elapsed().as_secs_f64(),
669            region_id,
670            all_unused_files_ready_for_delete.len()
671        );
672        debug!("All in exist linger files: {:?}", all_in_exist_linger_files);
673
674        Ok(all_unused_files_ready_for_delete)
675    }
676}
677
678/// Generate partition prefixes based on concurrency and
679/// assume file names are evenly-distributed uuid string,
680/// to evenly distribute files across partitions.
681/// For example, if concurrency is 2, partition prefixes will be:
682/// ["8"] so it divide uuids into two partitions based on the first character.
683/// If concurrency is 32, partition prefixes will be:
684/// ["08", "10", "18", "20", "28", "30", "38" ..., "f0", "f8"]
685/// if concurrency is 1, it returns an empty vector.
686///
687fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
688    let n = concurrency.next_power_of_two();
689    if n <= 1 {
690        return vec![];
691    }
692
693    // `d` is the number of hex characters required to build the partition key.
694    // `p` is the total number of possible values for a key of length `d`.
695    // We need to find the smallest `d` such that 16^d >= n.
696    let mut d = 0;
697    let mut p: u128 = 1;
698    while p < n as u128 {
699        p *= 16;
700        d += 1;
701    }
702
703    let total_space = p;
704    let step = total_space / n as u128;
705
706    (1..n)
707        .map(|i| {
708            let boundary = i as u128 * step;
709            format!("{:0width$x}", boundary, width = d)
710        })
711        .collect()
712}
713
714#[cfg(test)]
715mod tests {
716    use super::*;
717
718    #[test]
719    fn test_gen_partition_from_concurrency() {
720        let partitions = gen_partition_from_concurrency(1);
721        assert!(partitions.is_empty());
722
723        let partitions = gen_partition_from_concurrency(2);
724        assert_eq!(partitions, vec!["8"]);
725
726        let partitions = gen_partition_from_concurrency(3);
727        assert_eq!(partitions, vec!["4", "8", "c"]);
728
729        let partitions = gen_partition_from_concurrency(4);
730        assert_eq!(partitions, vec!["4", "8", "c"]);
731
732        let partitions = gen_partition_from_concurrency(8);
733        assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
734
735        let partitions = gen_partition_from_concurrency(16);
736        assert_eq!(
737            partitions,
738            vec![
739                "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
740            ]
741        );
742
743        let partitions = gen_partition_from_concurrency(32);
744        assert_eq!(
745            partitions,
746            [
747                "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
748                "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
749                "e8", "f0", "f8",
750            ]
751        );
752    }
753}