mito2/sst/index/indexer/
finish.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17use store_api::storage::ColumnId;
18
19use crate::sst::file::RegionFileId;
20use crate::sst::index::puffin_manager::SstPuffinWriter;
21use crate::sst::index::statistics::{ByteCount, RowCount};
22use crate::sst::index::{
23    BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
24};
25
26impl Indexer {
27    pub(crate) async fn do_finish(&mut self) -> IndexOutput {
28        let mut output = IndexOutput::default();
29
30        let Some(mut writer) = self.build_puffin_writer().await else {
31            self.do_abort().await;
32            return output;
33        };
34
35        let success = self
36            .do_finish_inverted_index(&mut writer, &mut output)
37            .await;
38        if !success {
39            self.do_abort().await;
40            return IndexOutput::default();
41        }
42
43        let success = self
44            .do_finish_fulltext_index(&mut writer, &mut output)
45            .await;
46        if !success {
47            self.do_abort().await;
48            return IndexOutput::default();
49        }
50
51        let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
52        if !success {
53            self.do_abort().await;
54            return IndexOutput::default();
55        }
56
57        output.file_size = self.do_finish_puffin_writer(writer).await;
58        output
59    }
60
61    async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
62        let puffin_manager = self.puffin_manager.take()?;
63
64        let err = match puffin_manager
65            .writer(&RegionFileId::new(self.region_id, self.file_id))
66            .await
67        {
68            Ok(writer) => return Some(writer),
69            Err(err) => err,
70        };
71
72        if cfg!(any(test, feature = "test")) {
73            panic!(
74                "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
75                self.region_id, self.file_id, err
76            );
77        } else {
78            warn!(
79                err; "Failed to create puffin writer, region_id: {}, file_id: {}",
80                self.region_id, self.file_id,
81            );
82        }
83
84        None
85    }
86
87    async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
88        let err = match writer.finish().await {
89            Ok(size) => return size,
90            Err(err) => err,
91        };
92
93        if cfg!(any(test, feature = "test")) {
94            panic!(
95                "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
96                self.region_id, self.file_id, err
97            );
98        } else {
99            warn!(
100                err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
101                self.region_id, self.file_id,
102            );
103        }
104
105        0
106    }
107
108    /// Returns false if the finish failed.
109    async fn do_finish_inverted_index(
110        &mut self,
111        puffin_writer: &mut SstPuffinWriter,
112        index_output: &mut IndexOutput,
113    ) -> bool {
114        let Some(mut indexer) = self.inverted_indexer.take() else {
115            return true;
116        };
117
118        let column_ids = indexer.column_ids().collect();
119        let err = match indexer.finish(puffin_writer).await {
120            Ok((row_count, byte_count)) => {
121                self.fill_inverted_index_output(
122                    &mut index_output.inverted_index,
123                    row_count,
124                    byte_count,
125                    column_ids,
126                );
127                return true;
128            }
129            Err(err) => err,
130        };
131
132        if cfg!(any(test, feature = "test")) {
133            panic!(
134                "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
135                self.region_id, self.file_id, err
136            );
137        } else {
138            warn!(
139                err; "Failed to finish inverted index, region_id: {}, file_id: {}",
140                self.region_id, self.file_id,
141            );
142        }
143
144        false
145    }
146
147    async fn do_finish_fulltext_index(
148        &mut self,
149        puffin_writer: &mut SstPuffinWriter,
150        index_output: &mut IndexOutput,
151    ) -> bool {
152        let Some(mut indexer) = self.fulltext_indexer.take() else {
153            return true;
154        };
155
156        let column_ids = indexer.column_ids().collect();
157        let err = match indexer.finish(puffin_writer).await {
158            Ok((row_count, byte_count)) => {
159                self.fill_fulltext_index_output(
160                    &mut index_output.fulltext_index,
161                    row_count,
162                    byte_count,
163                    column_ids,
164                );
165                return true;
166            }
167            Err(err) => err,
168        };
169
170        if cfg!(any(test, feature = "test")) {
171            panic!(
172                "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
173                self.region_id, self.file_id, err
174            );
175        } else {
176            warn!(
177                err; "Failed to finish full-text index, region_id: {}, file_id: {}",
178                self.region_id, self.file_id,
179            );
180        }
181
182        false
183    }
184
185    async fn do_finish_bloom_filter(
186        &mut self,
187        puffin_writer: &mut SstPuffinWriter,
188        index_output: &mut IndexOutput,
189    ) -> bool {
190        let Some(mut indexer) = self.bloom_filter_indexer.take() else {
191            return true;
192        };
193
194        let column_ids = indexer.column_ids().collect();
195        let err = match indexer.finish(puffin_writer).await {
196            Ok((row_count, byte_count)) => {
197                self.fill_bloom_filter_output(
198                    &mut index_output.bloom_filter,
199                    row_count,
200                    byte_count,
201                    column_ids,
202                );
203                return true;
204            }
205            Err(err) => err,
206        };
207
208        if cfg!(any(test, feature = "test")) {
209            panic!(
210                "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
211                self.region_id, self.file_id, err
212            );
213        } else {
214            warn!(
215                err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
216                self.region_id, self.file_id,
217            );
218        }
219
220        false
221    }
222
223    fn fill_inverted_index_output(
224        &mut self,
225        output: &mut InvertedIndexOutput,
226        row_count: RowCount,
227        byte_count: ByteCount,
228        column_ids: Vec<ColumnId>,
229    ) {
230        debug!(
231            "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
232            self.region_id, self.file_id, byte_count, row_count, column_ids
233        );
234
235        output.index_size = byte_count;
236        output.row_count = row_count;
237        output.columns = column_ids;
238    }
239
240    fn fill_fulltext_index_output(
241        &mut self,
242        output: &mut FulltextIndexOutput,
243        row_count: RowCount,
244        byte_count: ByteCount,
245        column_ids: Vec<ColumnId>,
246    ) {
247        debug!(
248            "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
249            self.region_id, self.file_id, byte_count, row_count, column_ids
250        );
251
252        output.index_size = byte_count;
253        output.row_count = row_count;
254        output.columns = column_ids;
255    }
256
257    fn fill_bloom_filter_output(
258        &mut self,
259        output: &mut BloomFilterOutput,
260        row_count: RowCount,
261        byte_count: ByteCount,
262        column_ids: Vec<ColumnId>,
263    ) {
264        debug!(
265            "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
266            self.region_id, self.file_id, byte_count, row_count, column_ids
267        );
268
269        output.index_size = byte_count;
270        output.row_count = row_count;
271        output.columns = column_ids;
272    }
273}