mito2/sst/index/indexer/
abort.rs1use common_telemetry::warn;
16
17use crate::access_layer::TempFileCleaner;
18use crate::sst::file::{RegionFileId, RegionIndexId};
19use crate::sst::index::Indexer;
20
21impl Indexer {
22 pub(crate) async fn do_abort(&mut self) {
23 self.do_abort_inverted_index().await;
24 self.do_abort_fulltext_index().await;
25 self.do_abort_bloom_filter().await;
26 self.do_prune_intm_sst_dir().await;
27 if self.write_cache_enabled {
28 self.do_abort_clean_fs_temp_dir().await;
29 }
30 self.puffin_manager = None;
31 }
32
33 async fn do_abort_inverted_index(&mut self) {
34 let Some(mut indexer) = self.inverted_indexer.take() else {
35 return;
36 };
37 let Err(err) = indexer.abort().await else {
38 return;
39 };
40
41 if cfg!(any(test, feature = "test")) {
42 panic!(
43 "Failed to abort inverted index, region_id: {}, file_id: {}, err: {:?}",
44 self.region_id, self.file_id, err
45 );
46 } else {
47 warn!(
48 err; "Failed to abort inverted index, region_id: {}, file_id: {}",
49 self.region_id, self.file_id,
50 );
51 }
52 }
53
54 async fn do_abort_fulltext_index(&mut self) {
55 let Some(mut indexer) = self.fulltext_indexer.take() else {
56 return;
57 };
58 let Err(err) = indexer.abort().await else {
59 return;
60 };
61
62 if cfg!(any(test, feature = "test")) {
63 panic!(
64 "Failed to abort full-text index, region_id: {}, file_id: {}, err: {:?}",
65 self.region_id, self.file_id, err
66 );
67 } else {
68 warn!(
69 err; "Failed to abort full-text index, region_id: {}, file_id: {}",
70 self.region_id, self.file_id,
71 );
72 }
73 }
74
75 async fn do_abort_bloom_filter(&mut self) {
76 let Some(mut indexer) = self.bloom_filter_indexer.take() else {
77 return;
78 };
79 let Err(err) = indexer.abort().await else {
80 return;
81 };
82
83 if cfg!(any(test, feature = "test")) {
84 panic!(
85 "Failed to abort bloom filter, region_id: {}, file_id: {}, err: {:?}",
86 self.region_id, self.file_id, err
87 );
88 } else {
89 warn!(
90 err; "Failed to abort bloom filter, region_id: {}, file_id: {}",
91 self.region_id, self.file_id,
92 );
93 }
94 }
95
96 async fn do_abort_clean_fs_temp_dir(&mut self) {
97 let Some(puffin_manager) = &self.puffin_manager else {
98 return;
99 };
100 let fs_accessor = puffin_manager.file_accessor();
101
102 let fs_handle = RegionIndexId::new(
103 RegionFileId::new(self.region_id, self.file_id),
104 self.index_version,
105 )
106 .to_string();
107 TempFileCleaner::clean_atomic_dir_files(fs_accessor.store().store(), &[&fs_handle]).await;
108 }
109}