1use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::{debug, error, info, warn};
30use common_time::Timestamp;
31use object_store::{Entry, Lister};
32use serde::{Deserialize, Serialize};
33use snafu::ResultExt as _;
34use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
35use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
36use tokio_stream::StreamExt;
37
38use crate::access_layer::AccessLayerRef;
39use crate::cache::CacheManagerRef;
40use crate::config::MitoConfig;
41use crate::error::{
42 DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu,
43};
44use crate::manifest::action::RegionManifest;
45use crate::metrics::GC_DELETE_FILE_CNT;
46use crate::region::{MitoRegionRef, RegionRoleState};
47use crate::sst::file::delete_files;
48use crate::sst::location::{self};
49
50#[cfg(test)]
51mod worker_test;
52
53pub struct GcLimiter {
55 pub gc_job_limit: Arc<tokio::sync::Semaphore>,
56 gc_concurrency: usize,
57}
58
59pub type GcLimiterRef = Arc<GcLimiter>;
60
61impl GcLimiter {
62 pub fn new(gc_concurrency: usize) -> Self {
63 Self {
64 gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
65 gc_concurrency,
66 }
67 }
68
69 pub fn running_gc_tasks(&self) -> u32 {
70 (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
71 }
72
73 pub fn gc_concurrency(&self) -> u32 {
74 self.gc_concurrency as u32
75 }
76
77 pub fn gc_stat(&self) -> GcStat {
78 GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
79 }
80
81 pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
85 self.gc_job_limit
86 .clone()
87 .try_acquire_owned()
88 .map_err(|e| match e {
89 TryAcquireError::Closed => UnexpectedSnafu {
90 reason: format!("Failed to acquire gc permit: {e}"),
91 }
92 .build(),
93 TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
94 })
95 }
96}
97
98#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
99#[serde(default)]
100pub struct GcConfig {
101 pub enable: bool,
103 #[serde(with = "humantime_serde")]
110 pub lingering_time: Option<Duration>,
111 #[serde(with = "humantime_serde")]
116 pub unknown_file_lingering_time: Duration,
117 pub max_concurrent_lister_per_gc_job: usize,
120 pub max_concurrent_gc_job: usize,
124}
125
126impl Default for GcConfig {
127 fn default() -> Self {
128 Self {
129 enable: false,
130 lingering_time: Some(Duration::from_secs(60)),
132 unknown_file_lingering_time: Duration::from_secs(60 * 60),
134 max_concurrent_lister_per_gc_job: 32,
135 max_concurrent_gc_job: 4,
136 }
137 }
138}
139
140pub struct LocalGcWorker {
141 pub(crate) access_layer: AccessLayerRef,
142 pub(crate) cache_manager: Option<CacheManagerRef>,
143 pub(crate) regions: BTreeMap<RegionId, MitoRegionRef>,
144 pub(crate) opt: GcConfig,
146 pub(crate) file_ref_manifest: FileRefsManifest,
151 _permit: OwnedSemaphorePermit,
152 pub full_file_listing: bool,
161}
162
163pub struct ManifestOpenConfig {
164 pub compress_manifest: bool,
165 pub manifest_checkpoint_distance: u64,
166 pub experimental_manifest_keep_removed_file_count: usize,
167 pub experimental_manifest_keep_removed_file_ttl: Duration,
168}
169
170impl From<MitoConfig> for ManifestOpenConfig {
171 fn from(mito_config: MitoConfig) -> Self {
172 Self {
173 compress_manifest: mito_config.compress_manifest,
174 manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
175 experimental_manifest_keep_removed_file_count: mito_config
176 .experimental_manifest_keep_removed_file_count,
177 experimental_manifest_keep_removed_file_ttl: mito_config
178 .experimental_manifest_keep_removed_file_ttl,
179 }
180 }
181}
182
183impl LocalGcWorker {
184 #[allow(clippy::too_many_arguments)]
188 pub async fn try_new(
189 access_layer: AccessLayerRef,
190 cache_manager: Option<CacheManagerRef>,
191 regions_to_gc: BTreeMap<RegionId, MitoRegionRef>,
192 opt: GcConfig,
193 file_ref_manifest: FileRefsManifest,
194 limiter: &GcLimiterRef,
195 full_file_listing: bool,
196 ) -> Result<Self> {
197 let permit = limiter.permit()?;
198
199 Ok(Self {
200 access_layer,
201 cache_manager,
202 regions: regions_to_gc,
203 opt,
204 file_ref_manifest,
205 _permit: permit,
206 full_file_listing,
207 })
208 }
209
210 pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileId>>> {
212 let mut tmp_ref_files = HashMap::new();
213 for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
214 tmp_ref_files
215 .entry(*region_id)
216 .or_insert_with(HashSet::new)
217 .extend(file_refs.clone());
218 }
219
220 Ok(tmp_ref_files)
221 }
222
223 pub async fn run(self) -> Result<GcReport> {
229 info!("LocalGcWorker started");
230 let now = std::time::Instant::now();
231
232 let mut deleted_files = HashMap::new();
233 let tmp_ref_files = self.read_tmp_ref_files().await?;
234 for (region_id, region) in &self.regions {
235 let per_region_time = std::time::Instant::now();
236 if region.manifest_ctx.current_state() == RegionRoleState::Follower {
237 return UnexpectedSnafu {
238 reason: format!(
239 "Region {} is in Follower state, should not run GC on follower regions",
240 region_id
241 ),
242 }
243 .fail();
244 }
245 let tmp_ref_files = tmp_ref_files
246 .get(region_id)
247 .cloned()
248 .unwrap_or_else(HashSet::new);
249 let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?;
250 deleted_files.insert(*region_id, files);
251 debug!(
252 "GC for region {} took {} secs.",
253 region_id,
254 per_region_time.elapsed().as_secs_f32()
255 );
256 }
257 info!(
258 "LocalGcWorker finished after {} secs.",
259 now.elapsed().as_secs_f32()
260 );
261 let report = GcReport {
262 deleted_files,
263 need_retry_regions: HashSet::new(),
264 };
265 Ok(report)
266 }
267}
268
269impl LocalGcWorker {
270 pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
273
274 pub async fn do_region_gc(
283 &self,
284 region: MitoRegionRef,
285 tmp_ref_files: &HashSet<FileId>,
286 ) -> Result<Vec<FileId>> {
287 let region_id = region.region_id();
288
289 debug!("Doing gc for region {}", region_id);
290 let all_entries = if self.full_file_listing {
293 self.list_from_object_store(®ion).await?
294 } else {
295 vec![]
296 };
297
298 let manifest = region.manifest_ctx.manifest().await;
299 let region_id = manifest.metadata.region_id;
300 let current_files = &manifest.files;
301
302 let recently_removed_files = self.get_removed_files_expel_times(&manifest).await?;
303
304 if recently_removed_files.is_empty() {
305 debug!("No recently removed files to gc for region {}", region_id);
307 }
308
309 let removed_file_cnt = recently_removed_files
310 .values()
311 .map(|s| s.len())
312 .sum::<usize>();
313
314 let in_used: HashSet<FileId> = current_files
315 .keys()
316 .cloned()
317 .chain(tmp_ref_files.clone().into_iter())
318 .collect();
319
320 let unused_files = self
321 .list_to_be_deleted_files(region_id, &in_used, recently_removed_files, all_entries)
322 .await?;
323
324 let unused_file_cnt = unused_files.len();
325
326 debug!(
327 "gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, In-used files: {}, recently removed files: {}, Unused files to delete: {} ",
328 current_files.len(),
329 tmp_ref_files.len(),
330 in_used.len(),
331 removed_file_cnt,
332 unused_files.len()
333 );
334
335 let file_pairs: Vec<(FileId, u64)> =
338 unused_files.iter().map(|file_id| (*file_id, 0)).collect();
339 debug!(
342 "Found {} unused index files to delete for region {}",
343 file_pairs.len(),
344 region_id
345 );
346
347 self.delete_files(region_id, &file_pairs).await?;
348
349 debug!(
350 "Successfully deleted {} unused files for region {}",
351 unused_file_cnt, region_id
352 );
353 self.update_manifest_removed_files(®ion, unused_files.clone())
355 .await?;
356
357 Ok(unused_files)
358 }
359
360 async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, u64)]) -> Result<()> {
361 delete_files(
362 region_id,
363 file_ids,
364 true,
365 &self.access_layer,
366 &self.cache_manager,
367 )
368 .await?;
369
370 GC_DELETE_FILE_CNT.add(file_ids.len() as i64);
372
373 Ok(())
374 }
375
376 async fn update_manifest_removed_files(
378 &self,
379 region: &MitoRegionRef,
380 deleted_files: Vec<FileId>,
381 ) -> Result<()> {
382 let deleted_file_cnt = deleted_files.len();
383 debug!(
384 "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
385 region.region_id()
386 );
387
388 let mut manager = region.manifest_ctx.manifest_manager.write().await;
389 let cnt = deleted_files.len();
390 manager.clear_deleted_files(deleted_files);
391 debug!(
392 "Updated region_id={} region manifest to clear {cnt} deleted files",
393 region.region_id(),
394 );
395
396 Ok(())
397 }
398
399 pub async fn get_removed_files_expel_times(
404 &self,
405 region_manifest: &Arc<RegionManifest>,
406 ) -> Result<BTreeMap<Timestamp, HashSet<FileId>>> {
407 let mut ret = BTreeMap::new();
408 for files in ®ion_manifest.removed_files.removed_files {
409 let expel_time = Timestamp::new_millisecond(files.removed_at);
410 let set = ret.entry(expel_time).or_insert_with(HashSet::new);
411 set.extend(files.file_ids.iter().cloned());
412 }
413
414 Ok(ret)
415 }
416
417 async fn partition_region_files(
420 &self,
421 region_id: RegionId,
422 concurrency: usize,
423 ) -> Result<Vec<(Lister, Option<String>)>> {
424 let region_dir = self.access_layer.build_region_dir(region_id);
425
426 let partitions = gen_partition_from_concurrency(concurrency);
427 let bounds = vec![None]
428 .into_iter()
429 .chain(partitions.iter().map(|p| Some(p.clone())))
430 .chain(vec![None])
431 .collect::<Vec<_>>();
432
433 let mut listers = vec![];
434 for part in bounds.windows(2) {
435 let start = part[0].clone();
436 let end = part[1].clone();
437 let mut lister = self.access_layer.object_store().lister_with(®ion_dir);
438 if let Some(s) = start {
439 lister = lister.start_after(&s);
440 }
441
442 let lister = lister.await.context(OpenDalSnafu)?;
443 listers.push((lister, end));
444 }
445
446 Ok(listers)
447 }
448
449 async fn list_from_object_store(&self, region: &MitoRegionRef) -> Result<Vec<Entry>> {
453 let start = tokio::time::Instant::now();
454 let region_id = region.region_id();
455 let manifest = region.manifest_ctx.manifest().await;
456 let current_files = &manifest.files;
457 let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
458 .max(1)
459 .min(self.opt.max_concurrent_lister_per_gc_job);
460
461 let listers = self.partition_region_files(region_id, concurrency).await?;
462 let lister_cnt = listers.len();
463
464 let all_entries = self.list_region_files_concurrent(listers).await?;
466 let cnt = all_entries.len();
467 info!(
468 "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
469 start.elapsed().as_secs_f64(),
470 region_id
471 );
472 Ok(all_entries)
473 }
474
475 async fn list_region_files_concurrent(
478 &self,
479 listers: Vec<(Lister, Option<String>)>,
480 ) -> Result<Vec<Entry>> {
481 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
482 let mut handles = vec![];
483
484 for (lister, end) in listers {
485 let tx = tx.clone();
486 let handle = tokio::spawn(async move {
487 let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
488 Ok(e) => {
489 if let Some(end) = &end {
490 e.name() < end.as_str()
492 } else {
493 true
495 }
496 }
497 Err(err) => {
499 warn!("Failed to list entry: {}", err);
500 true
501 }
502 });
503 let stream = stream
504 .filter(|e| {
505 if let Ok(e) = &e {
506 e.metadata().is_file()
508 } else {
509 true
511 }
512 })
513 .collect::<Vec<_>>()
514 .await;
515 tx.send(stream).await.expect("Failed to send entries");
517 });
518
519 handles.push(handle);
520 }
521
522 for handle in handles {
524 handle.await.context(JoinSnafu)?;
525 }
526
527 drop(tx); let mut all_entries = vec![];
531 while let Some(stream) = rx.recv().await {
532 all_entries.extend(stream.into_iter().filter_map(Result::ok));
533 }
534
535 Ok(all_entries)
536 }
537
538 fn filter_deletable_files(
541 &self,
542 entries: Vec<Entry>,
543 in_use_filenames: &HashSet<&FileId>,
544 may_linger_filenames: &HashSet<&FileId>,
545 eligible_for_removal: &HashSet<&FileId>,
546 unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
547 ) -> (Vec<FileId>, HashSet<FileId>) {
548 let mut all_unused_files_ready_for_delete = vec![];
549 let mut all_in_exist_linger_files = HashSet::new();
550
551 for entry in entries {
552 let file_id = match location::parse_file_id_from_path(entry.name()) {
553 Ok(file_id) => file_id,
554 Err(err) => {
555 error!(err; "Failed to parse file id from path: {}", entry.name());
556 continue;
559 }
560 };
561
562 if may_linger_filenames.contains(&file_id) {
563 all_in_exist_linger_files.insert(file_id);
564 }
565
566 let should_delete = !in_use_filenames.contains(&file_id)
567 && !may_linger_filenames.contains(&file_id)
568 && {
569 if !eligible_for_removal.contains(&file_id) {
570 entry
574 .metadata()
575 .last_modified()
576 .map(|t| t < unknown_file_may_linger_until)
577 .unwrap_or(false)
578 } else {
579 true
581 }
582 };
583
584 if should_delete {
585 all_unused_files_ready_for_delete.push(file_id);
586 }
587 }
588
589 (all_unused_files_ready_for_delete, all_in_exist_linger_files)
590 }
591
592 pub async fn list_to_be_deleted_files(
601 &self,
602 region_id: RegionId,
603 in_used: &HashSet<FileId>,
604 recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
605 all_entries: Vec<Entry>,
606 ) -> Result<Vec<FileId>> {
607 let now = chrono::Utc::now();
608 let may_linger_until = self
609 .opt
610 .lingering_time
611 .map(|lingering_time| {
612 chrono::Duration::from_std(lingering_time)
613 .with_context(|_| DurationOutOfRangeSnafu {
614 input: lingering_time,
615 })
616 .map(|t| now - t)
617 })
618 .transpose()?;
619
620 let unknown_file_may_linger_until = now
621 - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
622 |_| DurationOutOfRangeSnafu {
623 input: self.opt.unknown_file_lingering_time,
624 },
625 )?;
626
627 let threshold =
629 may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
630 let mut recently_removed_files = recently_removed_files;
631 let may_linger_files = match threshold {
632 Some(threshold) => recently_removed_files.split_off(&threshold),
633 None => BTreeMap::new(),
634 };
635 debug!("may_linger_files: {:?}", may_linger_files);
636
637 let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
638
639 let eligible_for_removal = recently_removed_files
640 .values()
641 .flatten()
642 .collect::<HashSet<_>>();
643
644 let in_use_filenames = in_used.iter().collect::<HashSet<_>>();
646
647 if !self.full_file_listing {
650 let files_to_delete: Vec<FileId> = eligible_for_removal
655 .iter()
656 .filter(|file_id| !in_use_filenames.contains(*file_id))
657 .map(|&f| *f)
658 .collect();
659
660 info!(
661 "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
662 region_id,
663 files_to_delete.len()
664 );
665
666 return Ok(files_to_delete);
667 }
668
669 let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
673 .filter_deletable_files(
674 all_entries,
675 &in_use_filenames,
676 &may_linger_filenames,
677 &eligible_for_removal,
678 unknown_file_may_linger_until,
679 );
680
681 debug!("All in exist linger files: {:?}", all_in_exist_linger_files);
682
683 Ok(all_unused_files_ready_for_delete)
684 }
685}
686
687fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
697 let n = concurrency.next_power_of_two();
698 if n <= 1 {
699 return vec![];
700 }
701
702 let mut d = 0;
706 let mut p: u128 = 1;
707 while p < n as u128 {
708 p *= 16;
709 d += 1;
710 }
711
712 let total_space = p;
713 let step = total_space / n as u128;
714
715 (1..n)
716 .map(|i| {
717 let boundary = i as u128 * step;
718 format!("{:0width$x}", boundary, width = d)
719 })
720 .collect()
721}
722
723#[cfg(test)]
724mod tests {
725 use super::*;
726
727 #[test]
728 fn test_gen_partition_from_concurrency() {
729 let partitions = gen_partition_from_concurrency(1);
730 assert!(partitions.is_empty());
731
732 let partitions = gen_partition_from_concurrency(2);
733 assert_eq!(partitions, vec!["8"]);
734
735 let partitions = gen_partition_from_concurrency(3);
736 assert_eq!(partitions, vec!["4", "8", "c"]);
737
738 let partitions = gen_partition_from_concurrency(4);
739 assert_eq!(partitions, vec!["4", "8", "c"]);
740
741 let partitions = gen_partition_from_concurrency(8);
742 assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
743
744 let partitions = gen_partition_from_concurrency(16);
745 assert_eq!(
746 partitions,
747 vec![
748 "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
749 ]
750 );
751
752 let partitions = gen_partition_from_concurrency(32);
753 assert_eq!(
754 partitions,
755 [
756 "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
757 "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
758 "e8", "f0", "f8",
759 ]
760 );
761 }
762}