1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
25use std::time::Duration;
26
27use common_telemetry::{error, info, warn};
28use common_time::Timestamp;
29use object_store::{Entry, Lister};
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt as _, ensure};
32use store_api::storage::{FileId, RegionId};
33use tokio_stream::StreamExt;
34
35use crate::access_layer::AccessLayerRef;
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39 DurationOutOfRangeSnafu, EmptyRegionDirSnafu, JoinSnafu, OpenDalSnafu, RegionNotFoundSnafu,
40 Result, UnexpectedSnafu,
41};
42use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
43use crate::manifest::storage::manifest_compress_type;
44use crate::metrics::GC_FILE_CNT;
45use crate::region::opener::new_manifest_dir;
46use crate::sst::file::delete_files;
47use crate::sst::file_ref::TableFileRefsManifest;
48use crate::sst::location::{self, region_dir_from_table_dir};
49
50#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
51pub struct GcReport {
52 pub deleted_files: HashMap<RegionId, Vec<FileId>>,
54 pub need_retry_regions: HashSet<RegionId>,
56}
57
58#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
59pub struct FileGcOption {
60 #[serde(with = "humantime_serde")]
66 pub lingering_time: Duration,
67 #[serde(with = "humantime_serde")]
72 pub unknown_file_lingering_time: Duration,
73 pub max_concurrent_lister_per_gc_job: usize,
76}
77
78impl Default for FileGcOption {
79 fn default() -> Self {
80 Self {
81 lingering_time: Duration::from_secs(60 * 5),
83 unknown_file_lingering_time: Duration::from_secs(60 * 60 * 6),
85 max_concurrent_lister_per_gc_job: 32,
86 }
87 }
88}
89
90pub struct LocalGcWorker {
91 pub(crate) access_layer: AccessLayerRef,
92 pub(crate) cache_manager: Option<CacheManagerRef>,
93 pub(crate) manifest_mgrs: HashMap<RegionId, RegionManifestManager>,
94 pub(crate) opt: FileGcOption,
96 pub(crate) manifest_open_config: ManifestOpenConfig,
97 pub(crate) file_ref_manifest: TableFileRefsManifest,
102}
103
104pub struct ManifestOpenConfig {
105 pub compress_manifest: bool,
106 pub manifest_checkpoint_distance: u64,
107 pub experimental_manifest_keep_removed_file_count: usize,
108 pub experimental_manifest_keep_removed_file_ttl: Duration,
109}
110
111impl From<MitoConfig> for ManifestOpenConfig {
112 fn from(mito_config: MitoConfig) -> Self {
113 Self {
114 compress_manifest: mito_config.compress_manifest,
115 manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
116 experimental_manifest_keep_removed_file_count: mito_config
117 .experimental_manifest_keep_removed_file_count,
118 experimental_manifest_keep_removed_file_ttl: mito_config
119 .experimental_manifest_keep_removed_file_ttl,
120 }
121 }
122}
123
124impl LocalGcWorker {
125 pub async fn try_new(
129 access_layer: AccessLayerRef,
130 cache_manager: Option<CacheManagerRef>,
131 regions_to_gc: BTreeSet<RegionId>,
132 opt: FileGcOption,
133 manifest_open_config: ManifestOpenConfig,
134 file_ref_manifest: TableFileRefsManifest,
135 ) -> Result<Self> {
136 let table_id = regions_to_gc
137 .first()
138 .context(UnexpectedSnafu {
139 reason: "Expect at least one region, found none",
140 })?
141 .table_id();
142 let mut zelf = Self {
143 access_layer,
144 cache_manager,
145 manifest_mgrs: HashMap::new(),
146 opt,
147 manifest_open_config,
148 file_ref_manifest,
149 };
150
151 for region_id in regions_to_gc {
153 ensure!(
154 region_id.table_id() == table_id,
155 UnexpectedSnafu {
156 reason: format!(
157 "All regions should belong to the same table, found region {} and table {}",
158 region_id, table_id
159 ),
160 }
161 );
162 let mgr = zelf.open_mgr_for(region_id).await?;
163 zelf.manifest_mgrs.insert(region_id, mgr);
164 }
165
166 Ok(zelf)
167 }
168
169 pub async fn read_tmp_ref_files(
173 &self,
174 outdated_regions: &mut HashSet<RegionId>,
175 ) -> Result<HashMap<RegionId, HashSet<FileId>>> {
176 for (region_id, region_mgr) in &self.manifest_mgrs {
177 let current_version = region_mgr.manifest().manifest_version;
178 if ¤t_version
179 > self
180 .file_ref_manifest
181 .manifest_version
182 .get(region_id)
183 .with_context(|| UnexpectedSnafu {
184 reason: format!(
185 "Region {} not found in tmp ref manifest version map",
186 region_id
187 ),
188 })?
189 {
190 outdated_regions.insert(*region_id);
191 }
192 }
193 let mut tmp_ref_files = HashMap::new();
196 for file_ref in &self.file_ref_manifest.file_refs {
197 if outdated_regions.contains(&file_ref.region_id) {
198 continue;
200 }
201 tmp_ref_files
202 .entry(file_ref.region_id)
203 .or_insert_with(HashSet::new)
204 .insert(file_ref.file_id);
205 }
206
207 Ok(tmp_ref_files)
208 }
209
210 pub async fn run(self) -> Result<GcReport> {
216 info!("LocalGcWorker started");
217 let now = std::time::Instant::now();
218
219 let mut outdated_regions = HashSet::new();
220 let mut deleted_files = HashMap::new();
221 let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?;
222 for region_id in self.manifest_mgrs.keys() {
223 info!("Doing gc for region {}", region_id);
224 let tmp_ref_files = tmp_ref_files
225 .get(region_id)
226 .cloned()
227 .unwrap_or_else(HashSet::new);
228 let files = self.do_region_gc(*region_id, &tmp_ref_files).await?;
229 deleted_files.insert(*region_id, files);
230 info!("Gc for region {} finished", region_id);
231 }
232 info!(
233 "LocalGcWorker finished after {} secs.",
234 now.elapsed().as_secs()
235 );
236 let report = GcReport {
237 deleted_files,
238 need_retry_regions: outdated_regions.into_iter().collect(),
239 };
240 Ok(report)
241 }
242}
243
244impl LocalGcWorker {
245 pub const CONCURRENCY_LIST_PER_FILES: usize = 512;
248
249 pub async fn do_region_gc(
258 &self,
259 region_id: RegionId,
260 tmp_ref_files: &HashSet<FileId>,
261 ) -> Result<Vec<FileId>> {
262 info!("Doing gc for region {}", region_id);
263 let manifest = self
264 .manifest_mgrs
265 .get(®ion_id)
266 .context(RegionNotFoundSnafu { region_id })?
267 .manifest();
268 let region_id = manifest.metadata.region_id;
269 let current_files = &manifest.files;
270
271 let recently_removed_files = self.get_removed_files_expel_times(region_id).await?;
272
273 if recently_removed_files.is_empty() {
274 info!("No recently removed files to gc for region {}", region_id);
276 }
277
278 info!(
279 "Found {} recently removed files sets for region {}",
280 recently_removed_files.len(),
281 region_id
282 );
283
284 let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
285 .max(1)
286 .min(self.opt.max_concurrent_lister_per_gc_job);
287
288 let in_used = current_files
289 .keys()
290 .cloned()
291 .chain(tmp_ref_files.clone().into_iter())
292 .collect();
293
294 let true_tmp_ref_files = tmp_ref_files
295 .iter()
296 .filter(|f| !current_files.contains_key(f))
297 .collect::<HashSet<_>>();
298
299 info!("True tmp ref files: {:?}", true_tmp_ref_files);
300
301 let unused_files = self
302 .list_to_be_deleted_files(region_id, in_used, recently_removed_files, concurrency)
303 .await?;
304
305 let unused_len = unused_files.len();
306
307 info!(
308 "Found {} unused files to delete for region {}",
309 unused_len, region_id
310 );
311
312 self.delete_files(region_id, &unused_files).await?;
313
314 info!(
315 "Successfully deleted {} unused files for region {}",
316 unused_len, region_id
317 );
318
319 Ok(unused_files)
320 }
321
322 async fn delete_files(&self, region_id: RegionId, file_ids: &[FileId]) -> Result<()> {
323 delete_files(
324 region_id,
325 file_ids,
326 true,
327 &self.access_layer,
328 &self.cache_manager,
329 )
330 .await?;
331
332 GC_FILE_CNT.add(file_ids.len() as i64);
333
334 Ok(())
335 }
336
337 async fn open_mgr_for(&self, region_id: RegionId) -> Result<RegionManifestManager> {
339 let table_dir = self.access_layer.table_dir();
340 let path_type = self.access_layer.path_type();
341 let mito_config = &self.manifest_open_config;
342
343 let region_manifest_options = RegionManifestOptions {
344 manifest_dir: new_manifest_dir(®ion_dir_from_table_dir(
345 table_dir, region_id, path_type,
346 )),
347 object_store: self.access_layer.object_store().clone(),
348 compress_type: manifest_compress_type(mito_config.compress_manifest),
349 checkpoint_distance: mito_config.manifest_checkpoint_distance,
350 remove_file_options: RemoveFileOptions {
351 keep_count: mito_config.experimental_manifest_keep_removed_file_count,
352 keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
353 },
354 };
355
356 RegionManifestManager::open(
357 region_manifest_options,
358 Default::default(),
359 Default::default(),
360 )
361 .await?
362 .context(EmptyRegionDirSnafu {
363 region_id,
364 region_dir: ®ion_dir_from_table_dir(table_dir, region_id, path_type),
365 })
366 }
367
368 pub async fn get_removed_files_expel_times(
373 &self,
374 region_id: RegionId,
375 ) -> Result<BTreeMap<Timestamp, HashSet<FileId>>> {
376 let region_manifest = self
377 .manifest_mgrs
378 .get(®ion_id)
379 .context(RegionNotFoundSnafu { region_id })?
380 .manifest();
381
382 let mut ret = BTreeMap::new();
383 for files in ®ion_manifest.removed_files.removed_files {
384 let expel_time = Timestamp::new_millisecond(files.removed_at);
385 let set = ret.entry(expel_time).or_insert_with(HashSet::new);
386 set.extend(files.file_ids.iter().cloned());
387 }
388
389 Ok(ret)
390 }
391
392 async fn partition_region_files(
395 &self,
396 region_id: RegionId,
397 concurrency: usize,
398 ) -> Result<Vec<(Lister, Option<String>)>> {
399 let region_dir = self.access_layer.build_region_dir(region_id);
400
401 let partitions = gen_partition_from_concurrency(concurrency);
402 let bounds = vec![None]
403 .into_iter()
404 .chain(partitions.iter().map(|p| Some(p.clone())))
405 .chain(vec![None])
406 .collect::<Vec<_>>();
407
408 let mut listers = vec![];
409 for part in bounds.windows(2) {
410 let start = part[0].clone();
411 let end = part[1].clone();
412 let mut lister = self.access_layer.object_store().lister_with(®ion_dir);
413 if let Some(s) = start {
414 lister = lister.start_after(&s);
415 }
416
417 let lister = lister.await.context(OpenDalSnafu)?;
418 listers.push((lister, end));
419 }
420
421 Ok(listers)
422 }
423
424 async fn list_region_files_concurrent(
427 &self,
428 listers: Vec<(Lister, Option<String>)>,
429 ) -> Result<Vec<Entry>> {
430 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
431 let mut handles = vec![];
432
433 for (lister, end) in listers {
434 let tx = tx.clone();
435 let handle = tokio::spawn(async move {
436 let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
437 Ok(e) => {
438 if let Some(end) = &end {
439 e.name() < end.as_str()
441 } else {
442 true
444 }
445 }
446 Err(err) => {
448 warn!("Failed to list entry: {}", err);
449 true
450 }
451 });
452 let stream = stream
453 .filter(|e| {
454 if let Ok(e) = &e {
455 e.metadata().is_file()
457 } else {
458 true
460 }
461 })
462 .collect::<Vec<_>>()
463 .await;
464 tx.send(stream).await.expect("Failed to send entries");
466 });
467
468 handles.push(handle);
469 }
470
471 for handle in handles {
473 handle.await.context(JoinSnafu)?;
474 }
475
476 drop(tx); let mut all_entries = vec![];
480 while let Some(stream) = rx.recv().await {
481 all_entries.extend(stream.into_iter().filter_map(Result::ok));
482 }
483
484 Ok(all_entries)
485 }
486
487 fn filter_deletable_files(
490 &self,
491 entries: Vec<Entry>,
492 in_use_filenames: &HashSet<&FileId>,
493 may_linger_filenames: &HashSet<&FileId>,
494 all_files_appear_in_delta_manifests: &HashSet<&FileId>,
495 unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
496 ) -> (Vec<FileId>, HashSet<FileId>) {
497 let mut all_unused_files_ready_for_delete = vec![];
498 let mut all_in_exist_linger_files = HashSet::new();
499
500 for entry in entries {
501 let file_id = match location::parse_file_id_from_path(entry.name()) {
502 Ok(file_id) => file_id,
503 Err(err) => {
504 error!(err; "Failed to parse file id from path: {}", entry.name());
505 continue;
508 }
509 };
510
511 if may_linger_filenames.contains(&file_id) {
512 all_in_exist_linger_files.insert(file_id);
513 }
514
515 let should_delete = !in_use_filenames.contains(&file_id)
516 && !may_linger_filenames.contains(&file_id)
517 && {
518 if !all_files_appear_in_delta_manifests.contains(&file_id) {
519 entry
523 .metadata()
524 .last_modified()
525 .map(|t| t < unknown_file_may_linger_until)
526 .unwrap_or(false)
527 } else {
528 true
530 }
531 };
532
533 if should_delete {
534 all_unused_files_ready_for_delete.push(file_id);
535 }
536 }
537
538 (all_unused_files_ready_for_delete, all_in_exist_linger_files)
539 }
540
541 pub async fn list_to_be_deleted_files(
545 &self,
546 region_id: RegionId,
547 in_used: HashSet<FileId>,
548 recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
549 concurrency: usize,
550 ) -> Result<Vec<FileId>> {
551 let now = chrono::Utc::now();
552 let may_linger_until = now
553 - chrono::Duration::from_std(self.opt.lingering_time).with_context(|_| {
554 DurationOutOfRangeSnafu {
555 input: self.opt.lingering_time,
556 }
557 })?;
558
559 let unknown_file_may_linger_until = now
560 - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
561 |_| DurationOutOfRangeSnafu {
562 input: self.opt.unknown_file_lingering_time,
563 },
564 )?;
565
566 let threshold = Timestamp::new_millisecond(may_linger_until.timestamp_millis());
568 let mut recently_removed_files = recently_removed_files;
569 let may_linger_files = recently_removed_files.split_off(&threshold);
570 let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
571
572 let all_files_appear_in_delta_manifests = recently_removed_files
573 .values()
574 .flatten()
575 .collect::<HashSet<_>>();
576
577 let in_use_filenames = in_used.iter().collect::<HashSet<_>>();
579
580 let listers = self.partition_region_files(region_id, concurrency).await?;
582
583 let all_entries = self.list_region_files_concurrent(listers).await?;
585
586 let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
588 .filter_deletable_files(
589 all_entries,
590 &in_use_filenames,
591 &may_linger_filenames,
592 &all_files_appear_in_delta_manifests,
593 unknown_file_may_linger_until,
594 );
595
596 info!("All in exist linger files: {:?}", all_in_exist_linger_files);
597
598 Ok(all_unused_files_ready_for_delete)
599 }
600}
601
602fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
612 let n = concurrency.next_power_of_two();
613 if n <= 1 {
614 return vec![];
615 }
616
617 let mut d = 0;
621 let mut p: u128 = 1;
622 while p < n as u128 {
623 p *= 16;
624 d += 1;
625 }
626
627 let total_space = p;
628 let step = total_space / n as u128;
629
630 (1..n)
631 .map(|i| {
632 let boundary = i as u128 * step;
633 format!("{:0width$x}", boundary, width = d)
634 })
635 .collect()
636}
637
638#[cfg(test)]
639mod tests {
640 use super::*;
641
642 #[test]
643 fn test_gen_partition_from_concurrency() {
644 let partitions = gen_partition_from_concurrency(1);
645 assert!(partitions.is_empty());
646
647 let partitions = gen_partition_from_concurrency(2);
648 assert_eq!(partitions, vec!["8"]);
649
650 let partitions = gen_partition_from_concurrency(3);
651 assert_eq!(partitions, vec!["4", "8", "c"]);
652
653 let partitions = gen_partition_from_concurrency(4);
654 assert_eq!(partitions, vec!["4", "8", "c"]);
655
656 let partitions = gen_partition_from_concurrency(8);
657 assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
658
659 let partitions = gen_partition_from_concurrency(16);
660 assert_eq!(
661 partitions,
662 vec![
663 "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
664 ]
665 );
666
667 let partitions = gen_partition_from_concurrency(32);
668 assert_eq!(
669 partitions,
670 [
671 "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
672 "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
673 "e8", "f0", "f8",
674 ]
675 );
676 }
677}