mito2/sst/index/indexer/
finish.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17use store_api::storage::ColumnId;
18
19use crate::sst::index::puffin_manager::SstPuffinWriter;
20use crate::sst::index::statistics::{ByteCount, RowCount};
21use crate::sst::index::{
22    BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
23};
24
25impl Indexer {
26    pub(crate) async fn do_finish(&mut self) -> IndexOutput {
27        let mut output = IndexOutput::default();
28
29        let Some(mut writer) = self.build_puffin_writer().await else {
30            self.do_abort().await;
31            return output;
32        };
33
34        let success = self
35            .do_finish_inverted_index(&mut writer, &mut output)
36            .await;
37        if !success {
38            self.do_abort().await;
39            return IndexOutput::default();
40        }
41
42        let success = self
43            .do_finish_fulltext_index(&mut writer, &mut output)
44            .await;
45        if !success {
46            self.do_abort().await;
47            return IndexOutput::default();
48        }
49
50        let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
51        if !success {
52            self.do_abort().await;
53            return IndexOutput::default();
54        }
55
56        output.file_size = self.do_finish_puffin_writer(writer).await;
57        output
58    }
59
60    async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
61        let puffin_manager = self.puffin_manager.take()?;
62
63        let err = match puffin_manager.writer(&self.file_id).await {
64            Ok(writer) => return Some(writer),
65            Err(err) => err,
66        };
67
68        if cfg!(any(test, feature = "test")) {
69            panic!(
70                "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
71                self.region_id, self.file_id, err
72            );
73        } else {
74            warn!(
75                err; "Failed to create puffin writer, region_id: {}, file_id: {}",
76                self.region_id, self.file_id,
77            );
78        }
79
80        None
81    }
82
83    async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
84        let err = match writer.finish().await {
85            Ok(size) => return size,
86            Err(err) => err,
87        };
88
89        if cfg!(any(test, feature = "test")) {
90            panic!(
91                "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
92                self.region_id, self.file_id, err
93            );
94        } else {
95            warn!(
96                err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
97                self.region_id, self.file_id,
98            );
99        }
100
101        0
102    }
103
104    /// Returns false if the finish failed.
105    async fn do_finish_inverted_index(
106        &mut self,
107        puffin_writer: &mut SstPuffinWriter,
108        index_output: &mut IndexOutput,
109    ) -> bool {
110        let Some(mut indexer) = self.inverted_indexer.take() else {
111            return true;
112        };
113
114        let column_ids = indexer.column_ids().collect();
115        let err = match indexer.finish(puffin_writer).await {
116            Ok((row_count, byte_count)) => {
117                self.fill_inverted_index_output(
118                    &mut index_output.inverted_index,
119                    row_count,
120                    byte_count,
121                    column_ids,
122                );
123                return true;
124            }
125            Err(err) => err,
126        };
127
128        if cfg!(any(test, feature = "test")) {
129            panic!(
130                "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
131                self.region_id, self.file_id, err
132            );
133        } else {
134            warn!(
135                err; "Failed to finish inverted index, region_id: {}, file_id: {}",
136                self.region_id, self.file_id,
137            );
138        }
139
140        false
141    }
142
143    async fn do_finish_fulltext_index(
144        &mut self,
145        puffin_writer: &mut SstPuffinWriter,
146        index_output: &mut IndexOutput,
147    ) -> bool {
148        let Some(mut indexer) = self.fulltext_indexer.take() else {
149            return true;
150        };
151
152        let column_ids = indexer.column_ids().collect();
153        let err = match indexer.finish(puffin_writer).await {
154            Ok((row_count, byte_count)) => {
155                self.fill_fulltext_index_output(
156                    &mut index_output.fulltext_index,
157                    row_count,
158                    byte_count,
159                    column_ids,
160                );
161                return true;
162            }
163            Err(err) => err,
164        };
165
166        if cfg!(any(test, feature = "test")) {
167            panic!(
168                "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
169                self.region_id, self.file_id, err
170            );
171        } else {
172            warn!(
173                err; "Failed to finish full-text index, region_id: {}, file_id: {}",
174                self.region_id, self.file_id,
175            );
176        }
177
178        false
179    }
180
181    async fn do_finish_bloom_filter(
182        &mut self,
183        puffin_writer: &mut SstPuffinWriter,
184        index_output: &mut IndexOutput,
185    ) -> bool {
186        let Some(mut indexer) = self.bloom_filter_indexer.take() else {
187            return true;
188        };
189
190        let column_ids = indexer.column_ids().collect();
191        let err = match indexer.finish(puffin_writer).await {
192            Ok((row_count, byte_count)) => {
193                self.fill_bloom_filter_output(
194                    &mut index_output.bloom_filter,
195                    row_count,
196                    byte_count,
197                    column_ids,
198                );
199                return true;
200            }
201            Err(err) => err,
202        };
203
204        if cfg!(any(test, feature = "test")) {
205            panic!(
206                "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
207                self.region_id, self.file_id, err
208            );
209        } else {
210            warn!(
211                err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
212                self.region_id, self.file_id,
213            );
214        }
215
216        false
217    }
218
219    fn fill_inverted_index_output(
220        &mut self,
221        output: &mut InvertedIndexOutput,
222        row_count: RowCount,
223        byte_count: ByteCount,
224        column_ids: Vec<ColumnId>,
225    ) {
226        debug!(
227            "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
228            self.region_id, self.file_id, byte_count, row_count, column_ids
229        );
230
231        output.index_size = byte_count;
232        output.row_count = row_count;
233        output.columns = column_ids;
234    }
235
236    fn fill_fulltext_index_output(
237        &mut self,
238        output: &mut FulltextIndexOutput,
239        row_count: RowCount,
240        byte_count: ByteCount,
241        column_ids: Vec<ColumnId>,
242    ) {
243        debug!(
244            "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
245            self.region_id, self.file_id, byte_count, row_count, column_ids
246        );
247
248        output.index_size = byte_count;
249        output.row_count = row_count;
250        output.columns = column_ids;
251    }
252
253    fn fill_bloom_filter_output(
254        &mut self,
255        output: &mut BloomFilterOutput,
256        row_count: RowCount,
257        byte_count: ByteCount,
258        column_ids: Vec<ColumnId>,
259    ) {
260        debug!(
261            "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
262            self.region_id, self.file_id, byte_count, row_count, column_ids
263        );
264
265        output.index_size = byte_count;
266        output.row_count = row_count;
267        output.columns = column_ids;
268    }
269}