mito2/sst/index/indexer/
finish.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17use store_api::storage::ColumnId;
18
19use crate::sst::file::RegionFileId;
20use crate::sst::index::puffin_manager::SstPuffinWriter;
21use crate::sst::index::statistics::{ByteCount, RowCount};
22use crate::sst::index::{
23    BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
24};
25
26impl Indexer {
27    pub(crate) async fn do_finish(&mut self) -> IndexOutput {
28        let mut output = IndexOutput::default();
29
30        let Some(mut writer) = self.build_puffin_writer().await else {
31            self.do_abort().await;
32            return output;
33        };
34
35        let success = self
36            .do_finish_inverted_index(&mut writer, &mut output)
37            .await;
38        if !success {
39            self.do_abort().await;
40            return IndexOutput::default();
41        }
42
43        let success = self
44            .do_finish_fulltext_index(&mut writer, &mut output)
45            .await;
46        if !success {
47            self.do_abort().await;
48            return IndexOutput::default();
49        }
50
51        let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
52        if !success {
53            self.do_abort().await;
54            return IndexOutput::default();
55        }
56
57        self.do_prune_intm_sst_dir().await;
58        output.file_size = self.do_finish_puffin_writer(writer).await;
59        output
60    }
61
62    async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
63        let puffin_manager = self.puffin_manager.take()?;
64
65        let err = match puffin_manager
66            .writer(&RegionFileId::new(self.region_id, self.file_id))
67            .await
68        {
69            Ok(writer) => return Some(writer),
70            Err(err) => err,
71        };
72
73        if cfg!(any(test, feature = "test")) {
74            panic!(
75                "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
76                self.region_id, self.file_id, err
77            );
78        } else {
79            warn!(
80                err; "Failed to create puffin writer, region_id: {}, file_id: {}",
81                self.region_id, self.file_id,
82            );
83        }
84
85        None
86    }
87
88    async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
89        let err = match writer.finish().await {
90            Ok(size) => return size,
91            Err(err) => err,
92        };
93
94        if cfg!(any(test, feature = "test")) {
95            panic!(
96                "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
97                self.region_id, self.file_id, err
98            );
99        } else {
100            warn!(
101                err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
102                self.region_id, self.file_id,
103            );
104        }
105
106        0
107    }
108
109    /// Returns false if the finish failed.
110    async fn do_finish_inverted_index(
111        &mut self,
112        puffin_writer: &mut SstPuffinWriter,
113        index_output: &mut IndexOutput,
114    ) -> bool {
115        let Some(mut indexer) = self.inverted_indexer.take() else {
116            return true;
117        };
118
119        let column_ids = indexer.column_ids().collect();
120        let err = match indexer.finish(puffin_writer).await {
121            Ok((row_count, byte_count)) => {
122                self.fill_inverted_index_output(
123                    &mut index_output.inverted_index,
124                    row_count,
125                    byte_count,
126                    column_ids,
127                );
128                return true;
129            }
130            Err(err) => err,
131        };
132
133        if cfg!(any(test, feature = "test")) {
134            panic!(
135                "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
136                self.region_id, self.file_id, err
137            );
138        } else {
139            warn!(
140                err; "Failed to finish inverted index, region_id: {}, file_id: {}",
141                self.region_id, self.file_id,
142            );
143        }
144
145        false
146    }
147
148    async fn do_finish_fulltext_index(
149        &mut self,
150        puffin_writer: &mut SstPuffinWriter,
151        index_output: &mut IndexOutput,
152    ) -> bool {
153        let Some(mut indexer) = self.fulltext_indexer.take() else {
154            return true;
155        };
156
157        let column_ids = indexer.column_ids().collect();
158        let err = match indexer.finish(puffin_writer).await {
159            Ok((row_count, byte_count)) => {
160                self.fill_fulltext_index_output(
161                    &mut index_output.fulltext_index,
162                    row_count,
163                    byte_count,
164                    column_ids,
165                );
166                return true;
167            }
168            Err(err) => err,
169        };
170
171        if cfg!(any(test, feature = "test")) {
172            panic!(
173                "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
174                self.region_id, self.file_id, err
175            );
176        } else {
177            warn!(
178                err; "Failed to finish full-text index, region_id: {}, file_id: {}",
179                self.region_id, self.file_id,
180            );
181        }
182
183        false
184    }
185
186    async fn do_finish_bloom_filter(
187        &mut self,
188        puffin_writer: &mut SstPuffinWriter,
189        index_output: &mut IndexOutput,
190    ) -> bool {
191        let Some(mut indexer) = self.bloom_filter_indexer.take() else {
192            return true;
193        };
194
195        let column_ids = indexer.column_ids().collect();
196        let err = match indexer.finish(puffin_writer).await {
197            Ok((row_count, byte_count)) => {
198                self.fill_bloom_filter_output(
199                    &mut index_output.bloom_filter,
200                    row_count,
201                    byte_count,
202                    column_ids,
203                );
204                return true;
205            }
206            Err(err) => err,
207        };
208
209        if cfg!(any(test, feature = "test")) {
210            panic!(
211                "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
212                self.region_id, self.file_id, err
213            );
214        } else {
215            warn!(
216                err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
217                self.region_id, self.file_id,
218            );
219        }
220
221        false
222    }
223
224    fn fill_inverted_index_output(
225        &mut self,
226        output: &mut InvertedIndexOutput,
227        row_count: RowCount,
228        byte_count: ByteCount,
229        column_ids: Vec<ColumnId>,
230    ) {
231        debug!(
232            "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
233            self.region_id, self.file_id, byte_count, row_count, column_ids
234        );
235
236        output.index_size = byte_count;
237        output.row_count = row_count;
238        output.columns = column_ids;
239    }
240
241    fn fill_fulltext_index_output(
242        &mut self,
243        output: &mut FulltextIndexOutput,
244        row_count: RowCount,
245        byte_count: ByteCount,
246        column_ids: Vec<ColumnId>,
247    ) {
248        debug!(
249            "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
250            self.region_id, self.file_id, byte_count, row_count, column_ids
251        );
252
253        output.index_size = byte_count;
254        output.row_count = row_count;
255        output.columns = column_ids;
256    }
257
258    fn fill_bloom_filter_output(
259        &mut self,
260        output: &mut BloomFilterOutput,
261        row_count: RowCount,
262        byte_count: ByteCount,
263        column_ids: Vec<ColumnId>,
264    ) {
265        debug!(
266            "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
267            self.region_id, self.file_id, byte_count, row_count, column_ids
268        );
269
270        output.index_size = byte_count;
271        output.row_count = row_count;
272        output.columns = column_ids;
273    }
274
275    pub(crate) async fn do_prune_intm_sst_dir(&mut self) {
276        if let Some(manager) = self.intermediate_manager.take() {
277            if let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await {
278                warn!(e; "Failed to prune intermediate SST directory, region_id: {}, file_id: {}", self.region_id, self.file_id);
279            }
280        }
281    }
282}