mito2/sst/index/indexer/
abort.rs1use common_telemetry::warn;
16
17use crate::access_layer::TempFileCleaner;
18use crate::sst::file::{RegionFileId, RegionIndexId};
19use crate::sst::index::Indexer;
20
21impl Indexer {
22 pub(crate) async fn do_abort(&mut self) {
23 self.do_abort_inverted_index().await;
24 self.do_abort_fulltext_index().await;
25 self.do_abort_bloom_filter().await;
26 #[cfg(feature = "vector_index")]
27 self.do_abort_vector_index().await;
28 self.do_prune_intm_sst_dir().await;
29 if self.write_cache_enabled {
30 self.do_abort_clean_fs_temp_dir().await;
31 }
32 self.puffin_manager = None;
33 }
34
35 async fn do_abort_inverted_index(&mut self) {
36 let Some(mut indexer) = self.inverted_indexer.take() else {
37 return;
38 };
39 let Err(err) = indexer.abort().await else {
40 return;
41 };
42
43 if cfg!(any(test, feature = "test")) {
44 panic!(
45 "Failed to abort inverted index, region_id: {}, file_id: {}, err: {:?}",
46 self.region_id, self.file_id, err
47 );
48 } else {
49 warn!(
50 err; "Failed to abort inverted index, region_id: {}, file_id: {}",
51 self.region_id, self.file_id,
52 );
53 }
54 }
55
56 async fn do_abort_fulltext_index(&mut self) {
57 let Some(mut indexer) = self.fulltext_indexer.take() else {
58 return;
59 };
60 let Err(err) = indexer.abort().await else {
61 return;
62 };
63
64 if cfg!(any(test, feature = "test")) {
65 panic!(
66 "Failed to abort full-text index, region_id: {}, file_id: {}, err: {:?}",
67 self.region_id, self.file_id, err
68 );
69 } else {
70 warn!(
71 err; "Failed to abort full-text index, region_id: {}, file_id: {}",
72 self.region_id, self.file_id,
73 );
74 }
75 }
76
77 async fn do_abort_bloom_filter(&mut self) {
78 let Some(mut indexer) = self.bloom_filter_indexer.take() else {
79 return;
80 };
81 let Err(err) = indexer.abort().await else {
82 return;
83 };
84
85 if cfg!(any(test, feature = "test")) {
86 panic!(
87 "Failed to abort bloom filter, region_id: {}, file_id: {}, err: {:?}",
88 self.region_id, self.file_id, err
89 );
90 } else {
91 warn!(
92 err; "Failed to abort bloom filter, region_id: {}, file_id: {}",
93 self.region_id, self.file_id,
94 );
95 }
96 }
97
98 async fn do_abort_clean_fs_temp_dir(&mut self) {
99 let Some(puffin_manager) = &self.puffin_manager else {
100 return;
101 };
102 let fs_accessor = puffin_manager.file_accessor();
103
104 let fs_handle = RegionIndexId::new(
105 RegionFileId::new(self.region_id, self.file_id),
106 self.index_version,
107 )
108 .to_string();
109 TempFileCleaner::clean_atomic_dir_files(fs_accessor.store().store(), &[&fs_handle]).await;
110 }
111
112 #[cfg(feature = "vector_index")]
113 async fn do_abort_vector_index(&mut self) {
114 let Some(mut indexer) = self.vector_indexer.take() else {
115 return;
116 };
117 let Err(err) = indexer.abort().await else {
118 return;
119 };
120
121 if cfg!(any(test, feature = "test")) {
122 panic!(
123 "Failed to abort vector index, region_id: {}, file_id: {}, err: {:?}",
124 self.region_id, self.file_id, err
125 );
126 } else {
127 warn!(
128 err; "Failed to abort vector index, region_id: {}, file_id: {}",
129 self.region_id, self.file_id,
130 );
131 }
132 }
133}