1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::{debug, error, info, warn};
30use common_time::Timestamp;
31use object_store::{Entry, Lister};
32use serde::{Deserialize, Serialize};
33use snafu::{OptionExt, ResultExt as _, ensure};
34use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
35use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
36use tokio_stream::StreamExt;
37
38use crate::access_layer::AccessLayerRef;
39use crate::cache::CacheManagerRef;
40use crate::config::MitoConfig;
41use crate::error::{
42 DurationOutOfRangeSnafu, EmptyRegionDirSnafu, JoinSnafu, OpenDalSnafu, RegionNotFoundSnafu,
43 Result, TooManyGcJobsSnafu, UnexpectedSnafu,
44};
45use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
46use crate::manifest::storage::manifest_compress_type;
47use crate::metrics::GC_DEL_FILE_CNT;
48use crate::region::opener::new_manifest_dir;
49use crate::sst::file::delete_files;
50use crate::sst::location::{self, region_dir_from_table_dir};
51
52pub struct GcLimiter {
54 pub gc_job_limit: Arc<tokio::sync::Semaphore>,
55 gc_concurrency: usize,
56}
57
58pub type GcLimiterRef = Arc<GcLimiter>;
59
60impl GcLimiter {
61 pub fn new(gc_concurrency: usize) -> Self {
62 Self {
63 gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
64 gc_concurrency,
65 }
66 }
67
68 pub fn running_gc_tasks(&self) -> u32 {
69 (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
70 }
71
72 pub fn gc_concurrency(&self) -> u32 {
73 self.gc_concurrency as u32
74 }
75
76 pub fn gc_stat(&self) -> GcStat {
77 GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
78 }
79
80 pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
84 self.gc_job_limit
85 .clone()
86 .try_acquire_owned()
87 .map_err(|e| match e {
88 TryAcquireError::Closed => UnexpectedSnafu {
89 reason: format!("Failed to acquire gc permit: {e}"),
90 }
91 .build(),
92 TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
93 })
94 }
95}
96
97#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
98pub struct GcConfig {
99 pub enable: bool,
101 #[serde(with = "humantime_serde")]
107 pub lingering_time: Duration,
108 #[serde(with = "humantime_serde")]
113 pub unknown_file_lingering_time: Duration,
114 pub max_concurrent_lister_per_gc_job: usize,
117 pub max_concurrent_gc_job: usize,
121}
122
123impl Default for GcConfig {
124 fn default() -> Self {
125 Self {
126 enable: false,
127 lingering_time: Duration::from_secs(60 * 5),
129 unknown_file_lingering_time: Duration::from_secs(60 * 60 * 6),
131 max_concurrent_lister_per_gc_job: 32,
132 max_concurrent_gc_job: 4,
133 }
134 }
135}
136
137pub struct LocalGcWorker {
138 pub(crate) access_layer: AccessLayerRef,
139 pub(crate) cache_manager: Option<CacheManagerRef>,
140 pub(crate) manifest_mgrs: HashMap<RegionId, RegionManifestManager>,
141 pub(crate) opt: GcConfig,
143 pub(crate) manifest_open_config: ManifestOpenConfig,
144 pub(crate) file_ref_manifest: FileRefsManifest,
149 _permit: OwnedSemaphorePermit,
150 pub full_file_listing: bool,
159}
160
161pub struct ManifestOpenConfig {
162 pub compress_manifest: bool,
163 pub manifest_checkpoint_distance: u64,
164 pub experimental_manifest_keep_removed_file_count: usize,
165 pub experimental_manifest_keep_removed_file_ttl: Duration,
166}
167
168impl From<MitoConfig> for ManifestOpenConfig {
169 fn from(mito_config: MitoConfig) -> Self {
170 Self {
171 compress_manifest: mito_config.compress_manifest,
172 manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
173 experimental_manifest_keep_removed_file_count: mito_config
174 .experimental_manifest_keep_removed_file_count,
175 experimental_manifest_keep_removed_file_ttl: mito_config
176 .experimental_manifest_keep_removed_file_ttl,
177 }
178 }
179}
180
181impl LocalGcWorker {
182 #[allow(clippy::too_many_arguments)]
186 pub async fn try_new(
187 access_layer: AccessLayerRef,
188 cache_manager: Option<CacheManagerRef>,
189 regions_to_gc: BTreeSet<RegionId>,
190 opt: GcConfig,
191 manifest_open_config: ManifestOpenConfig,
192 file_ref_manifest: FileRefsManifest,
193 limiter: &GcLimiterRef,
194 full_file_listing: bool,
195 ) -> Result<Self> {
196 let table_id = regions_to_gc
197 .first()
198 .context(UnexpectedSnafu {
199 reason: "Expect at least one region, found none",
200 })?
201 .table_id();
202 let permit = limiter.permit()?;
203 let mut zelf = Self {
204 access_layer,
205 cache_manager,
206 manifest_mgrs: HashMap::new(),
207 opt,
208 manifest_open_config,
209 file_ref_manifest,
210 _permit: permit,
211 full_file_listing,
212 };
213
214 for region_id in regions_to_gc {
216 ensure!(
217 region_id.table_id() == table_id,
218 UnexpectedSnafu {
219 reason: format!(
220 "All regions should belong to the same table, found region {} and table {}",
221 region_id, table_id
222 ),
223 }
224 );
225 let mgr = zelf.open_mgr_for(region_id).await?;
226 zelf.manifest_mgrs.insert(region_id, mgr);
227 }
228
229 Ok(zelf)
230 }
231
232 pub async fn read_tmp_ref_files(
236 &self,
237 outdated_regions: &mut HashSet<RegionId>,
238 ) -> Result<HashMap<RegionId, HashSet<FileId>>> {
239 for (region_id, region_mgr) in &self.manifest_mgrs {
240 let current_version = region_mgr.manifest().manifest_version;
241 if ¤t_version
242 > self
243 .file_ref_manifest
244 .manifest_version
245 .get(region_id)
246 .with_context(|| UnexpectedSnafu {
247 reason: format!(
248 "Region {} not found in tmp ref manifest version map",
249 region_id
250 ),
251 })?
252 {
253 outdated_regions.insert(*region_id);
254 }
255 }
256 let mut tmp_ref_files = HashMap::new();
259 for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
260 if outdated_regions.contains(region_id) {
261 continue;
263 }
264 tmp_ref_files
265 .entry(*region_id)
266 .or_insert_with(HashSet::new)
267 .extend(file_refs.clone());
268 }
269
270 Ok(tmp_ref_files)
271 }
272
273 pub async fn run(self) -> Result<GcReport> {
279 info!("LocalGcWorker started");
280 let now = std::time::Instant::now();
281
282 let mut outdated_regions = HashSet::new();
283 let mut deleted_files = HashMap::new();
284 let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?;
285 for region_id in self.manifest_mgrs.keys() {
286 debug!("Doing gc for region {}", region_id);
287 let tmp_ref_files = tmp_ref_files
288 .get(region_id)
289 .cloned()
290 .unwrap_or_else(HashSet::new);
291 let files = self.do_region_gc(*region_id, &tmp_ref_files).await?;
292 deleted_files.insert(*region_id, files);
293 debug!("Gc for region {} finished", region_id);
294 }
295 info!(
296 "LocalGcWorker finished after {} secs.",
297 now.elapsed().as_secs()
298 );
299 let report = GcReport {
300 deleted_files,
301 need_retry_regions: outdated_regions.into_iter().collect(),
302 };
303 Ok(report)
304 }
305}
306
307impl LocalGcWorker {
308 pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
311
312 pub async fn do_region_gc(
321 &self,
322 region_id: RegionId,
323 tmp_ref_files: &HashSet<FileId>,
324 ) -> Result<Vec<FileId>> {
325 debug!("Doing gc for region {}", region_id);
326 let manifest = self
327 .manifest_mgrs
328 .get(®ion_id)
329 .context(RegionNotFoundSnafu { region_id })?
330 .manifest();
331 let region_id = manifest.metadata.region_id;
332 let current_files = &manifest.files;
333
334 let recently_removed_files = self.get_removed_files_expel_times(region_id).await?;
335
336 if recently_removed_files.is_empty() {
337 debug!("No recently removed files to gc for region {}", region_id);
339 }
340
341 debug!(
342 "Found {} recently removed files sets for region {}",
343 recently_removed_files.len(),
344 region_id
345 );
346
347 let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
348 .max(1)
349 .min(self.opt.max_concurrent_lister_per_gc_job);
350
351 let in_used = current_files
352 .keys()
353 .cloned()
354 .chain(tmp_ref_files.clone().into_iter())
355 .collect();
356
357 let unused_files = self
358 .list_to_be_deleted_files(region_id, in_used, recently_removed_files, concurrency)
359 .await?;
360
361 let unused_len = unused_files.len();
362
363 debug!(
364 "Found {} unused files to delete for region {}",
365 unused_len, region_id
366 );
367
368 let file_pairs: Vec<(FileId, FileId)> = unused_files
369 .iter()
370 .filter_map(|file_id| {
371 current_files
372 .get(file_id)
373 .map(|meta| (meta.file_id().file_id(), meta.index_file_id().file_id()))
374 })
375 .collect();
376
377 info!(
378 "Found {} unused index files to delete for region {}",
379 file_pairs.len(),
380 region_id
381 );
382
383 self.delete_files(region_id, &file_pairs).await?;
384
385 debug!(
386 "Successfully deleted {} unused files for region {}",
387 unused_len, region_id
388 );
389
390 Ok(unused_files)
391 }
392
393 async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> {
394 delete_files(
395 region_id,
396 file_ids,
397 true,
398 &self.access_layer,
399 &self.cache_manager,
400 )
401 .await?;
402
403 GC_DEL_FILE_CNT.add(file_ids.len() as i64);
405
406 Ok(())
407 }
408
409 async fn open_mgr_for(&self, region_id: RegionId) -> Result<RegionManifestManager> {
411 let table_dir = self.access_layer.table_dir();
412 let path_type = self.access_layer.path_type();
413 let mito_config = &self.manifest_open_config;
414
415 let region_manifest_options = RegionManifestOptions {
416 manifest_dir: new_manifest_dir(®ion_dir_from_table_dir(
417 table_dir, region_id, path_type,
418 )),
419 object_store: self.access_layer.object_store().clone(),
420 compress_type: manifest_compress_type(mito_config.compress_manifest),
421 checkpoint_distance: mito_config.manifest_checkpoint_distance,
422 remove_file_options: RemoveFileOptions {
423 keep_count: mito_config.experimental_manifest_keep_removed_file_count,
424 keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
425 },
426 };
427
428 RegionManifestManager::open(
429 region_manifest_options,
430 Default::default(),
431 Default::default(),
432 )
433 .await?
434 .context(EmptyRegionDirSnafu {
435 region_id,
436 region_dir: ®ion_dir_from_table_dir(table_dir, region_id, path_type),
437 })
438 }
439
440 pub async fn get_removed_files_expel_times(
445 &self,
446 region_id: RegionId,
447 ) -> Result<BTreeMap<Timestamp, HashSet<FileId>>> {
448 let region_manifest = self
449 .manifest_mgrs
450 .get(®ion_id)
451 .context(RegionNotFoundSnafu { region_id })?
452 .manifest();
453
454 let mut ret = BTreeMap::new();
455 for files in ®ion_manifest.removed_files.removed_files {
456 let expel_time = Timestamp::new_millisecond(files.removed_at);
457 let set = ret.entry(expel_time).or_insert_with(HashSet::new);
458 set.extend(files.file_ids.iter().cloned());
459 }
460
461 Ok(ret)
462 }
463
464 async fn partition_region_files(
467 &self,
468 region_id: RegionId,
469 concurrency: usize,
470 ) -> Result<Vec<(Lister, Option<String>)>> {
471 let region_dir = self.access_layer.build_region_dir(region_id);
472
473 let partitions = gen_partition_from_concurrency(concurrency);
474 let bounds = vec![None]
475 .into_iter()
476 .chain(partitions.iter().map(|p| Some(p.clone())))
477 .chain(vec![None])
478 .collect::<Vec<_>>();
479
480 let mut listers = vec![];
481 for part in bounds.windows(2) {
482 let start = part[0].clone();
483 let end = part[1].clone();
484 let mut lister = self.access_layer.object_store().lister_with(®ion_dir);
485 if let Some(s) = start {
486 lister = lister.start_after(&s);
487 }
488
489 let lister = lister.await.context(OpenDalSnafu)?;
490 listers.push((lister, end));
491 }
492
493 Ok(listers)
494 }
495
496 async fn list_region_files_concurrent(
499 &self,
500 listers: Vec<(Lister, Option<String>)>,
501 ) -> Result<Vec<Entry>> {
502 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
503 let mut handles = vec![];
504
505 for (lister, end) in listers {
506 let tx = tx.clone();
507 let handle = tokio::spawn(async move {
508 let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
509 Ok(e) => {
510 if let Some(end) = &end {
511 e.name() < end.as_str()
513 } else {
514 true
516 }
517 }
518 Err(err) => {
520 warn!("Failed to list entry: {}", err);
521 true
522 }
523 });
524 let stream = stream
525 .filter(|e| {
526 if let Ok(e) = &e {
527 e.metadata().is_file()
529 } else {
530 true
532 }
533 })
534 .collect::<Vec<_>>()
535 .await;
536 tx.send(stream).await.expect("Failed to send entries");
538 });
539
540 handles.push(handle);
541 }
542
543 for handle in handles {
545 handle.await.context(JoinSnafu)?;
546 }
547
548 drop(tx); let mut all_entries = vec![];
552 while let Some(stream) = rx.recv().await {
553 all_entries.extend(stream.into_iter().filter_map(Result::ok));
554 }
555
556 Ok(all_entries)
557 }
558
559 fn filter_deletable_files(
562 &self,
563 entries: Vec<Entry>,
564 in_use_filenames: &HashSet<&FileId>,
565 may_linger_filenames: &HashSet<&FileId>,
566 eligible_for_removal: &HashSet<&FileId>,
567 unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
568 ) -> (Vec<FileId>, HashSet<FileId>) {
569 let mut all_unused_files_ready_for_delete = vec![];
570 let mut all_in_exist_linger_files = HashSet::new();
571
572 for entry in entries {
573 let file_id = match location::parse_file_id_from_path(entry.name()) {
574 Ok(file_id) => file_id,
575 Err(err) => {
576 error!(err; "Failed to parse file id from path: {}", entry.name());
577 continue;
580 }
581 };
582
583 if may_linger_filenames.contains(&file_id) {
584 all_in_exist_linger_files.insert(file_id);
585 }
586
587 let should_delete = !in_use_filenames.contains(&file_id)
588 && !may_linger_filenames.contains(&file_id)
589 && {
590 if !eligible_for_removal.contains(&file_id) {
591 entry
595 .metadata()
596 .last_modified()
597 .map(|t| t < unknown_file_may_linger_until)
598 .unwrap_or(false)
599 } else {
600 true
602 }
603 };
604
605 if should_delete {
606 all_unused_files_ready_for_delete.push(file_id);
607 }
608 }
609
610 (all_unused_files_ready_for_delete, all_in_exist_linger_files)
611 }
612
613 pub async fn list_to_be_deleted_files(
622 &self,
623 region_id: RegionId,
624 in_used: HashSet<FileId>,
625 recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
626 concurrency: usize,
627 ) -> Result<Vec<FileId>> {
628 let start = tokio::time::Instant::now();
629 let now = chrono::Utc::now();
630 let may_linger_until = now
631 - chrono::Duration::from_std(self.opt.lingering_time).with_context(|_| {
632 DurationOutOfRangeSnafu {
633 input: self.opt.lingering_time,
634 }
635 })?;
636
637 let unknown_file_may_linger_until = now
638 - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
639 |_| DurationOutOfRangeSnafu {
640 input: self.opt.unknown_file_lingering_time,
641 },
642 )?;
643
644 let threshold = Timestamp::new_millisecond(may_linger_until.timestamp_millis());
646 let mut recently_removed_files = recently_removed_files;
647 let may_linger_files = recently_removed_files.split_off(&threshold);
648 let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
649
650 let eligible_for_removal = recently_removed_files
651 .values()
652 .flatten()
653 .collect::<HashSet<_>>();
654
655 let in_use_filenames = in_used.iter().collect::<HashSet<_>>();
657
658 if !self.full_file_listing {
661 let files_to_delete: Vec<FileId> = eligible_for_removal
666 .iter()
667 .filter(|file_id| !in_use_filenames.contains(*file_id))
668 .map(|&f| *f)
669 .collect();
670
671 info!(
672 "gc: fast mode (no full listing) cost {} secs for region {}, found {} files to delete from manifest",
673 start.elapsed().as_secs_f64(),
674 region_id,
675 files_to_delete.len()
676 );
677
678 return Ok(files_to_delete);
679 }
680
681 let listers = self.partition_region_files(region_id, concurrency).await?;
684 let lister_cnt = listers.len();
685
686 let all_entries = self.list_region_files_concurrent(listers).await?;
688
689 let cnt = all_entries.len();
690
691 let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
693 .filter_deletable_files(
694 all_entries,
695 &in_use_filenames,
696 &may_linger_filenames,
697 &eligible_for_removal,
698 unknown_file_may_linger_until,
699 );
700
701 info!(
702 "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}, found {} unused files to delete",
703 start.elapsed().as_secs_f64(),
704 region_id,
705 all_unused_files_ready_for_delete.len()
706 );
707 debug!("All in exist linger files: {:?}", all_in_exist_linger_files);
708
709 Ok(all_unused_files_ready_for_delete)
710 }
711}
712
713fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
723 let n = concurrency.next_power_of_two();
724 if n <= 1 {
725 return vec![];
726 }
727
728 let mut d = 0;
732 let mut p: u128 = 1;
733 while p < n as u128 {
734 p *= 16;
735 d += 1;
736 }
737
738 let total_space = p;
739 let step = total_space / n as u128;
740
741 (1..n)
742 .map(|i| {
743 let boundary = i as u128 * step;
744 format!("{:0width$x}", boundary, width = d)
745 })
746 .collect()
747}
748
749#[cfg(test)]
750mod tests {
751 use super::*;
752
753 #[test]
754 fn test_gen_partition_from_concurrency() {
755 let partitions = gen_partition_from_concurrency(1);
756 assert!(partitions.is_empty());
757
758 let partitions = gen_partition_from_concurrency(2);
759 assert_eq!(partitions, vec!["8"]);
760
761 let partitions = gen_partition_from_concurrency(3);
762 assert_eq!(partitions, vec!["4", "8", "c"]);
763
764 let partitions = gen_partition_from_concurrency(4);
765 assert_eq!(partitions, vec!["4", "8", "c"]);
766
767 let partitions = gen_partition_from_concurrency(8);
768 assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
769
770 let partitions = gen_partition_from_concurrency(16);
771 assert_eq!(
772 partitions,
773 vec![
774 "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
775 ]
776 );
777
778 let partitions = gen_partition_from_concurrency(32);
779 assert_eq!(
780 partitions,
781 [
782 "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
783 "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
784 "e8", "f0", "f8",
785 ]
786 );
787 }
788}