mito2/sst/index/indexer/
abort.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::warn;
16
17use crate::access_layer::TempFileCleaner;
18use crate::sst::file::{RegionFileId, RegionIndexId};
19use crate::sst::index::Indexer;
20
21impl Indexer {
22    pub(crate) async fn do_abort(&mut self) {
23        self.do_abort_inverted_index().await;
24        self.do_abort_fulltext_index().await;
25        self.do_abort_bloom_filter().await;
26        #[cfg(feature = "vector_index")]
27        self.do_abort_vector_index().await;
28        self.do_prune_intm_sst_dir().await;
29        if self.write_cache_enabled {
30            self.do_abort_clean_fs_temp_dir().await;
31        }
32        self.puffin_manager = None;
33    }
34
35    async fn do_abort_inverted_index(&mut self) {
36        let Some(mut indexer) = self.inverted_indexer.take() else {
37            return;
38        };
39        let Err(err) = indexer.abort().await else {
40            return;
41        };
42
43        if cfg!(any(test, feature = "test")) {
44            panic!(
45                "Failed to abort inverted index, region_id: {}, file_id: {}, err: {:?}",
46                self.region_id, self.file_id, err
47            );
48        } else {
49            warn!(
50                err; "Failed to abort inverted index, region_id: {}, file_id: {}",
51                self.region_id, self.file_id,
52            );
53        }
54    }
55
56    async fn do_abort_fulltext_index(&mut self) {
57        let Some(mut indexer) = self.fulltext_indexer.take() else {
58            return;
59        };
60        let Err(err) = indexer.abort().await else {
61            return;
62        };
63
64        if cfg!(any(test, feature = "test")) {
65            panic!(
66                "Failed to abort full-text index, region_id: {}, file_id: {}, err: {:?}",
67                self.region_id, self.file_id, err
68            );
69        } else {
70            warn!(
71                err; "Failed to abort full-text index, region_id: {}, file_id: {}",
72                self.region_id, self.file_id,
73            );
74        }
75    }
76
77    async fn do_abort_bloom_filter(&mut self) {
78        let Some(mut indexer) = self.bloom_filter_indexer.take() else {
79            return;
80        };
81        let Err(err) = indexer.abort().await else {
82            return;
83        };
84
85        if cfg!(any(test, feature = "test")) {
86            panic!(
87                "Failed to abort bloom filter, region_id: {}, file_id: {}, err: {:?}",
88                self.region_id, self.file_id, err
89            );
90        } else {
91            warn!(
92                err; "Failed to abort bloom filter, region_id: {}, file_id: {}",
93                self.region_id, self.file_id,
94            );
95        }
96    }
97
98    async fn do_abort_clean_fs_temp_dir(&mut self) {
99        let Some(puffin_manager) = &self.puffin_manager else {
100            return;
101        };
102        let fs_accessor = puffin_manager.file_accessor();
103
104        let fs_handle = RegionIndexId::new(
105            RegionFileId::new(self.region_id, self.file_id),
106            self.index_version,
107        )
108        .to_string();
109        TempFileCleaner::clean_atomic_dir_files(fs_accessor.store().store(), &[&fs_handle]).await;
110    }
111
112    #[cfg(feature = "vector_index")]
113    async fn do_abort_vector_index(&mut self) {
114        let Some(mut indexer) = self.vector_indexer.take() else {
115            return;
116        };
117        let Err(err) = indexer.abort().await else {
118            return;
119        };
120
121        if cfg!(any(test, feature = "test")) {
122            panic!(
123                "Failed to abort vector index, region_id: {}, file_id: {}, err: {:?}",
124                self.region_id, self.file_id, err
125            );
126        } else {
127            warn!(
128                err; "Failed to abort vector index, region_id: {}, file_id: {}",
129                self.region_id, self.file_id,
130            );
131        }
132    }
133}