mito2/sst/index/indexer/
finish.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17use store_api::storage::ColumnId;
18
19use crate::sst::file::{RegionFileId, RegionIndexId};
20#[cfg(feature = "vector_index")]
21use crate::sst::index::VectorIndexOutput;
22use crate::sst::index::puffin_manager::SstPuffinWriter;
23use crate::sst::index::statistics::{ByteCount, RowCount};
24use crate::sst::index::{
25    BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
26};
27
28impl Indexer {
29    pub(crate) async fn do_finish(&mut self) -> IndexOutput {
30        let mut output = IndexOutput::default();
31
32        let Some(mut writer) = self.build_puffin_writer().await else {
33            self.do_abort().await;
34            return output;
35        };
36
37        let success = self
38            .do_finish_inverted_index(&mut writer, &mut output)
39            .await;
40        if !success {
41            self.do_abort().await;
42            return IndexOutput::default();
43        }
44
45        let success = self
46            .do_finish_fulltext_index(&mut writer, &mut output)
47            .await;
48        if !success {
49            self.do_abort().await;
50            return IndexOutput::default();
51        }
52
53        let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
54        if !success {
55            self.do_abort().await;
56            return IndexOutput::default();
57        }
58
59        #[cfg(feature = "vector_index")]
60        {
61            let success = self.do_finish_vector_index(&mut writer, &mut output).await;
62            if !success {
63                self.do_abort().await;
64                return IndexOutput::default();
65            }
66        }
67
68        self.do_prune_intm_sst_dir().await;
69        output.file_size = self.do_finish_puffin_writer(writer).await;
70        output.version = self.index_version;
71        output
72    }
73
74    async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
75        let puffin_manager = self.puffin_manager.clone()?;
76
77        let err = match puffin_manager
78            .writer(&RegionIndexId::new(
79                RegionFileId::new(self.region_id, self.file_id),
80                self.index_version,
81            ))
82            .await
83        {
84            Ok(writer) => return Some(writer),
85            Err(err) => err,
86        };
87
88        if cfg!(any(test, feature = "test")) {
89            panic!(
90                "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
91                self.region_id, self.file_id, err
92            );
93        } else {
94            warn!(
95                err; "Failed to create puffin writer, region_id: {}, file_id: {}",
96                self.region_id, self.file_id,
97            );
98        }
99
100        None
101    }
102
103    async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
104        let err = match writer.finish().await {
105            Ok(size) => return size,
106            Err(err) => err,
107        };
108
109        if cfg!(any(test, feature = "test")) {
110            panic!(
111                "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
112                self.region_id, self.file_id, err
113            );
114        } else {
115            warn!(
116                err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
117                self.region_id, self.file_id,
118            );
119        }
120
121        0
122    }
123
124    /// Returns false if the finish failed.
125    async fn do_finish_inverted_index(
126        &mut self,
127        puffin_writer: &mut SstPuffinWriter,
128        index_output: &mut IndexOutput,
129    ) -> bool {
130        let Some(mut indexer) = self.inverted_indexer.take() else {
131            return true;
132        };
133
134        let column_ids = indexer.column_ids().collect();
135        let err = match indexer.finish(puffin_writer).await {
136            Ok((row_count, byte_count)) => {
137                self.fill_inverted_index_output(
138                    &mut index_output.inverted_index,
139                    row_count,
140                    byte_count,
141                    column_ids,
142                );
143                return true;
144            }
145            Err(err) => err,
146        };
147
148        if cfg!(any(test, feature = "test")) {
149            panic!(
150                "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
151                self.region_id, self.file_id, err
152            );
153        } else {
154            warn!(
155                err; "Failed to finish inverted index, region_id: {}, file_id: {}",
156                self.region_id, self.file_id,
157            );
158        }
159
160        false
161    }
162
163    async fn do_finish_fulltext_index(
164        &mut self,
165        puffin_writer: &mut SstPuffinWriter,
166        index_output: &mut IndexOutput,
167    ) -> bool {
168        let Some(mut indexer) = self.fulltext_indexer.take() else {
169            return true;
170        };
171
172        let column_ids = indexer.column_ids().collect();
173        let err = match indexer.finish(puffin_writer).await {
174            Ok((row_count, byte_count)) => {
175                self.fill_fulltext_index_output(
176                    &mut index_output.fulltext_index,
177                    row_count,
178                    byte_count,
179                    column_ids,
180                );
181                return true;
182            }
183            Err(err) => err,
184        };
185
186        if cfg!(any(test, feature = "test")) {
187            panic!(
188                "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
189                self.region_id, self.file_id, err
190            );
191        } else {
192            warn!(
193                err; "Failed to finish full-text index, region_id: {}, file_id: {}",
194                self.region_id, self.file_id,
195            );
196        }
197
198        false
199    }
200
201    async fn do_finish_bloom_filter(
202        &mut self,
203        puffin_writer: &mut SstPuffinWriter,
204        index_output: &mut IndexOutput,
205    ) -> bool {
206        let Some(mut indexer) = self.bloom_filter_indexer.take() else {
207            return true;
208        };
209
210        let column_ids = indexer.column_ids().collect();
211        let err = match indexer.finish(puffin_writer).await {
212            Ok((row_count, byte_count)) => {
213                self.fill_bloom_filter_output(
214                    &mut index_output.bloom_filter,
215                    row_count,
216                    byte_count,
217                    column_ids,
218                );
219                return true;
220            }
221            Err(err) => err,
222        };
223
224        if cfg!(any(test, feature = "test")) {
225            panic!(
226                "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
227                self.region_id, self.file_id, err
228            );
229        } else {
230            warn!(
231                err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
232                self.region_id, self.file_id,
233            );
234        }
235
236        false
237    }
238
239    fn fill_inverted_index_output(
240        &mut self,
241        output: &mut InvertedIndexOutput,
242        row_count: RowCount,
243        byte_count: ByteCount,
244        column_ids: Vec<ColumnId>,
245    ) {
246        debug!(
247            "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
248            self.region_id, self.file_id, byte_count, row_count, column_ids
249        );
250
251        output.index_size = byte_count;
252        output.row_count = row_count;
253        output.columns = column_ids;
254    }
255
256    fn fill_fulltext_index_output(
257        &mut self,
258        output: &mut FulltextIndexOutput,
259        row_count: RowCount,
260        byte_count: ByteCount,
261        column_ids: Vec<ColumnId>,
262    ) {
263        debug!(
264            "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
265            self.region_id, self.file_id, byte_count, row_count, column_ids
266        );
267
268        output.index_size = byte_count;
269        output.row_count = row_count;
270        output.columns = column_ids;
271    }
272
273    fn fill_bloom_filter_output(
274        &mut self,
275        output: &mut BloomFilterOutput,
276        row_count: RowCount,
277        byte_count: ByteCount,
278        column_ids: Vec<ColumnId>,
279    ) {
280        debug!(
281            "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
282            self.region_id, self.file_id, byte_count, row_count, column_ids
283        );
284
285        output.index_size = byte_count;
286        output.row_count = row_count;
287        output.columns = column_ids;
288    }
289
290    #[cfg(feature = "vector_index")]
291    async fn do_finish_vector_index(
292        &mut self,
293        puffin_writer: &mut SstPuffinWriter,
294        index_output: &mut IndexOutput,
295    ) -> bool {
296        let Some(mut indexer) = self.vector_indexer.take() else {
297            return true;
298        };
299
300        let column_ids = indexer.column_ids().collect();
301        let err = match indexer.finish(puffin_writer).await {
302            Ok((row_count, byte_count)) => {
303                self.fill_vector_index_output(
304                    &mut index_output.vector_index,
305                    row_count,
306                    byte_count,
307                    column_ids,
308                );
309                return true;
310            }
311            Err(err) => err,
312        };
313
314        if cfg!(any(test, feature = "test")) {
315            panic!(
316                "Failed to finish vector index, region_id: {}, file_id: {}, err: {:?}",
317                self.region_id, self.file_id, err
318            );
319        } else {
320            warn!(
321                err; "Failed to finish vector index, region_id: {}, file_id: {}",
322                self.region_id, self.file_id,
323            );
324        }
325
326        false
327    }
328
329    #[cfg(feature = "vector_index")]
330    fn fill_vector_index_output(
331        &mut self,
332        output: &mut VectorIndexOutput,
333        row_count: RowCount,
334        byte_count: ByteCount,
335        column_ids: Vec<ColumnId>,
336    ) {
337        debug!(
338            "Vector index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
339            self.region_id, self.file_id, byte_count, row_count, column_ids
340        );
341
342        output.index_size = byte_count;
343        output.row_count = row_count;
344        output.columns = column_ids;
345    }
346
347    pub(crate) async fn do_prune_intm_sst_dir(&mut self) {
348        if let Some(manager) = self.intermediate_manager.take()
349            && let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await
350        {
351            warn!(e; "Failed to prune intermediate SST directory, region_id: {}, file_id: {}", self.region_id, self.file_id);
352        }
353    }
354}