mito2/sst/index/indexer/
finish.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17use store_api::storage::ColumnId;
18
19use crate::sst::file::{RegionFileId, RegionIndexId};
20use crate::sst::index::puffin_manager::SstPuffinWriter;
21use crate::sst::index::statistics::{ByteCount, RowCount};
22use crate::sst::index::{
23    BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
24};
25
26impl Indexer {
27    pub(crate) async fn do_finish(&mut self) -> IndexOutput {
28        let mut output = IndexOutput::default();
29
30        let Some(mut writer) = self.build_puffin_writer().await else {
31            self.do_abort().await;
32            return output;
33        };
34
35        let success = self
36            .do_finish_inverted_index(&mut writer, &mut output)
37            .await;
38        if !success {
39            self.do_abort().await;
40            return IndexOutput::default();
41        }
42
43        let success = self
44            .do_finish_fulltext_index(&mut writer, &mut output)
45            .await;
46        if !success {
47            self.do_abort().await;
48            return IndexOutput::default();
49        }
50
51        let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
52        if !success {
53            self.do_abort().await;
54            return IndexOutput::default();
55        }
56
57        self.do_prune_intm_sst_dir().await;
58        output.file_size = self.do_finish_puffin_writer(writer).await;
59        output.version = self.index_version;
60        output
61    }
62
63    async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
64        let puffin_manager = self.puffin_manager.clone()?;
65
66        let err = match puffin_manager
67            .writer(&RegionIndexId::new(
68                RegionFileId::new(self.region_id, self.file_id),
69                self.index_version,
70            ))
71            .await
72        {
73            Ok(writer) => return Some(writer),
74            Err(err) => err,
75        };
76
77        if cfg!(any(test, feature = "test")) {
78            panic!(
79                "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
80                self.region_id, self.file_id, err
81            );
82        } else {
83            warn!(
84                err; "Failed to create puffin writer, region_id: {}, file_id: {}",
85                self.region_id, self.file_id,
86            );
87        }
88
89        None
90    }
91
92    async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
93        let err = match writer.finish().await {
94            Ok(size) => return size,
95            Err(err) => err,
96        };
97
98        if cfg!(any(test, feature = "test")) {
99            panic!(
100                "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
101                self.region_id, self.file_id, err
102            );
103        } else {
104            warn!(
105                err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
106                self.region_id, self.file_id,
107            );
108        }
109
110        0
111    }
112
113    /// Returns false if the finish failed.
114    async fn do_finish_inverted_index(
115        &mut self,
116        puffin_writer: &mut SstPuffinWriter,
117        index_output: &mut IndexOutput,
118    ) -> bool {
119        let Some(mut indexer) = self.inverted_indexer.take() else {
120            return true;
121        };
122
123        let column_ids = indexer.column_ids().collect();
124        let err = match indexer.finish(puffin_writer).await {
125            Ok((row_count, byte_count)) => {
126                self.fill_inverted_index_output(
127                    &mut index_output.inverted_index,
128                    row_count,
129                    byte_count,
130                    column_ids,
131                );
132                return true;
133            }
134            Err(err) => err,
135        };
136
137        if cfg!(any(test, feature = "test")) {
138            panic!(
139                "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
140                self.region_id, self.file_id, err
141            );
142        } else {
143            warn!(
144                err; "Failed to finish inverted index, region_id: {}, file_id: {}",
145                self.region_id, self.file_id,
146            );
147        }
148
149        false
150    }
151
152    async fn do_finish_fulltext_index(
153        &mut self,
154        puffin_writer: &mut SstPuffinWriter,
155        index_output: &mut IndexOutput,
156    ) -> bool {
157        let Some(mut indexer) = self.fulltext_indexer.take() else {
158            return true;
159        };
160
161        let column_ids = indexer.column_ids().collect();
162        let err = match indexer.finish(puffin_writer).await {
163            Ok((row_count, byte_count)) => {
164                self.fill_fulltext_index_output(
165                    &mut index_output.fulltext_index,
166                    row_count,
167                    byte_count,
168                    column_ids,
169                );
170                return true;
171            }
172            Err(err) => err,
173        };
174
175        if cfg!(any(test, feature = "test")) {
176            panic!(
177                "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
178                self.region_id, self.file_id, err
179            );
180        } else {
181            warn!(
182                err; "Failed to finish full-text index, region_id: {}, file_id: {}",
183                self.region_id, self.file_id,
184            );
185        }
186
187        false
188    }
189
190    async fn do_finish_bloom_filter(
191        &mut self,
192        puffin_writer: &mut SstPuffinWriter,
193        index_output: &mut IndexOutput,
194    ) -> bool {
195        let Some(mut indexer) = self.bloom_filter_indexer.take() else {
196            return true;
197        };
198
199        let column_ids = indexer.column_ids().collect();
200        let err = match indexer.finish(puffin_writer).await {
201            Ok((row_count, byte_count)) => {
202                self.fill_bloom_filter_output(
203                    &mut index_output.bloom_filter,
204                    row_count,
205                    byte_count,
206                    column_ids,
207                );
208                return true;
209            }
210            Err(err) => err,
211        };
212
213        if cfg!(any(test, feature = "test")) {
214            panic!(
215                "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
216                self.region_id, self.file_id, err
217            );
218        } else {
219            warn!(
220                err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
221                self.region_id, self.file_id,
222            );
223        }
224
225        false
226    }
227
228    fn fill_inverted_index_output(
229        &mut self,
230        output: &mut InvertedIndexOutput,
231        row_count: RowCount,
232        byte_count: ByteCount,
233        column_ids: Vec<ColumnId>,
234    ) {
235        debug!(
236            "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
237            self.region_id, self.file_id, byte_count, row_count, column_ids
238        );
239
240        output.index_size = byte_count;
241        output.row_count = row_count;
242        output.columns = column_ids;
243    }
244
245    fn fill_fulltext_index_output(
246        &mut self,
247        output: &mut FulltextIndexOutput,
248        row_count: RowCount,
249        byte_count: ByteCount,
250        column_ids: Vec<ColumnId>,
251    ) {
252        debug!(
253            "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
254            self.region_id, self.file_id, byte_count, row_count, column_ids
255        );
256
257        output.index_size = byte_count;
258        output.row_count = row_count;
259        output.columns = column_ids;
260    }
261
262    fn fill_bloom_filter_output(
263        &mut self,
264        output: &mut BloomFilterOutput,
265        row_count: RowCount,
266        byte_count: ByteCount,
267        column_ids: Vec<ColumnId>,
268    ) {
269        debug!(
270            "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
271            self.region_id, self.file_id, byte_count, row_count, column_ids
272        );
273
274        output.index_size = byte_count;
275        output.row_count = row_count;
276        output.columns = column_ids;
277    }
278
279    pub(crate) async fn do_prune_intm_sst_dir(&mut self) {
280        if let Some(manager) = self.intermediate_manager.take()
281            && let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await
282        {
283            warn!(e; "Failed to prune intermediate SST directory, region_id: {}, file_id: {}", self.region_id, self.file_id);
284        }
285    }
286}