mito2/sst/index/indexer/
finish.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17
18use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
19use crate::sst::index::fulltext_index::creator::FulltextIndexer;
20use crate::sst::index::inverted_index::creator::InvertedIndexer;
21use crate::sst::index::puffin_manager::SstPuffinWriter;
22use crate::sst::index::statistics::{ByteCount, RowCount};
23use crate::sst::index::{
24    BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
25};
26
27impl Indexer {
28    pub(crate) async fn do_finish(&mut self) -> IndexOutput {
29        let mut output = IndexOutput::default();
30
31        let Some(mut writer) = self.build_puffin_writer().await else {
32            self.do_abort().await;
33            return output;
34        };
35
36        let success = self
37            .do_finish_inverted_index(&mut writer, &mut output)
38            .await;
39        if !success {
40            self.do_abort().await;
41            return IndexOutput::default();
42        }
43
44        let success = self
45            .do_finish_fulltext_index(&mut writer, &mut output)
46            .await;
47        if !success {
48            self.do_abort().await;
49            return IndexOutput::default();
50        }
51
52        let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
53        if !success {
54            self.do_abort().await;
55            return IndexOutput::default();
56        }
57
58        output.file_size = self.do_finish_puffin_writer(writer).await;
59        output
60    }
61
62    async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
63        let puffin_manager = self.puffin_manager.take()?;
64
65        let err = match puffin_manager.writer(&self.file_id).await {
66            Ok(writer) => return Some(writer),
67            Err(err) => err,
68        };
69
70        if cfg!(any(test, feature = "test")) {
71            panic!(
72                "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
73                self.region_id, self.file_id, err
74            );
75        } else {
76            warn!(
77                err; "Failed to create puffin writer, region_id: {}, file_id: {}",
78                self.region_id, self.file_id,
79            );
80        }
81
82        None
83    }
84
85    async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
86        let err = match writer.finish().await {
87            Ok(size) => return size,
88            Err(err) => err,
89        };
90
91        if cfg!(any(test, feature = "test")) {
92            panic!(
93                "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
94                self.region_id, self.file_id, err
95            );
96        } else {
97            warn!(
98                err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
99                self.region_id, self.file_id,
100            );
101        }
102
103        0
104    }
105
106    /// Returns false if the finish failed.
107    async fn do_finish_inverted_index(
108        &mut self,
109        puffin_writer: &mut SstPuffinWriter,
110        index_output: &mut IndexOutput,
111    ) -> bool {
112        let Some(mut indexer) = self.inverted_indexer.take() else {
113            return true;
114        };
115
116        let err = match indexer.finish(puffin_writer).await {
117            Ok((row_count, byte_count)) => {
118                self.fill_inverted_index_output(
119                    &mut index_output.inverted_index,
120                    row_count,
121                    byte_count,
122                    &indexer,
123                );
124                return true;
125            }
126            Err(err) => err,
127        };
128
129        if cfg!(any(test, feature = "test")) {
130            panic!(
131                "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
132                self.region_id, self.file_id, err
133            );
134        } else {
135            warn!(
136                err; "Failed to finish inverted index, region_id: {}, file_id: {}",
137                self.region_id, self.file_id,
138            );
139        }
140
141        false
142    }
143
144    async fn do_finish_fulltext_index(
145        &mut self,
146        puffin_writer: &mut SstPuffinWriter,
147        index_output: &mut IndexOutput,
148    ) -> bool {
149        let Some(mut indexer) = self.fulltext_indexer.take() else {
150            return true;
151        };
152
153        let err = match indexer.finish(puffin_writer).await {
154            Ok((row_count, byte_count)) => {
155                self.fill_fulltext_index_output(
156                    &mut index_output.fulltext_index,
157                    row_count,
158                    byte_count,
159                    &indexer,
160                );
161                return true;
162            }
163            Err(err) => err,
164        };
165
166        if cfg!(any(test, feature = "test")) {
167            panic!(
168                "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
169                self.region_id, self.file_id, err
170            );
171        } else {
172            warn!(
173                err; "Failed to finish full-text index, region_id: {}, file_id: {}",
174                self.region_id, self.file_id,
175            );
176        }
177
178        false
179    }
180
181    async fn do_finish_bloom_filter(
182        &mut self,
183        puffin_writer: &mut SstPuffinWriter,
184        index_output: &mut IndexOutput,
185    ) -> bool {
186        let Some(mut indexer) = self.bloom_filter_indexer.take() else {
187            return true;
188        };
189
190        let err = match indexer.finish(puffin_writer).await {
191            Ok((row_count, byte_count)) => {
192                self.fill_bloom_filter_output(
193                    &mut index_output.bloom_filter,
194                    row_count,
195                    byte_count,
196                    &indexer,
197                );
198                return true;
199            }
200            Err(err) => err,
201        };
202
203        if cfg!(any(test, feature = "test")) {
204            panic!(
205                "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
206                self.region_id, self.file_id, err
207            );
208        } else {
209            warn!(
210                err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
211                self.region_id, self.file_id,
212            );
213        }
214
215        false
216    }
217
218    fn fill_inverted_index_output(
219        &mut self,
220        output: &mut InvertedIndexOutput,
221        row_count: RowCount,
222        byte_count: ByteCount,
223        indexer: &InvertedIndexer,
224    ) {
225        debug!(
226            "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
227            self.region_id, self.file_id, byte_count, row_count
228        );
229
230        output.index_size = byte_count;
231        output.row_count = row_count;
232        output.columns = indexer.column_ids().collect();
233    }
234
235    fn fill_fulltext_index_output(
236        &mut self,
237        output: &mut FulltextIndexOutput,
238        row_count: RowCount,
239        byte_count: ByteCount,
240        indexer: &FulltextIndexer,
241    ) {
242        debug!(
243            "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
244            self.region_id, self.file_id, byte_count, row_count
245        );
246
247        output.index_size = byte_count;
248        output.row_count = row_count;
249        output.columns = indexer.column_ids().collect();
250    }
251
252    fn fill_bloom_filter_output(
253        &mut self,
254        output: &mut BloomFilterOutput,
255        row_count: RowCount,
256        byte_count: ByteCount,
257        indexer: &BloomFilterIndexer,
258    ) {
259        debug!(
260            "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
261            self.region_id, self.file_id, byte_count, row_count
262        );
263
264        output.index_size = byte_count;
265        output.row_count = row_count;
266        output.columns = indexer.column_ids().collect();
267    }
268}