1use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17
18use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
19use crate::sst::index::fulltext_index::creator::FulltextIndexer;
20use crate::sst::index::inverted_index::creator::InvertedIndexer;
21use crate::sst::index::puffin_manager::SstPuffinWriter;
22use crate::sst::index::statistics::{ByteCount, RowCount};
23use crate::sst::index::{
24 BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
25};
26
27impl Indexer {
28 pub(crate) async fn do_finish(&mut self) -> IndexOutput {
29 let mut output = IndexOutput::default();
30
31 let Some(mut writer) = self.build_puffin_writer().await else {
32 self.do_abort().await;
33 return output;
34 };
35
36 let success = self
37 .do_finish_inverted_index(&mut writer, &mut output)
38 .await;
39 if !success {
40 self.do_abort().await;
41 return IndexOutput::default();
42 }
43
44 let success = self
45 .do_finish_fulltext_index(&mut writer, &mut output)
46 .await;
47 if !success {
48 self.do_abort().await;
49 return IndexOutput::default();
50 }
51
52 let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
53 if !success {
54 self.do_abort().await;
55 return IndexOutput::default();
56 }
57
58 output.file_size = self.do_finish_puffin_writer(writer).await;
59 output
60 }
61
62 async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
63 let puffin_manager = self.puffin_manager.take()?;
64
65 let err = match puffin_manager.writer(&self.file_id).await {
66 Ok(writer) => return Some(writer),
67 Err(err) => err,
68 };
69
70 if cfg!(any(test, feature = "test")) {
71 panic!(
72 "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
73 self.region_id, self.file_id, err
74 );
75 } else {
76 warn!(
77 err; "Failed to create puffin writer, region_id: {}, file_id: {}",
78 self.region_id, self.file_id,
79 );
80 }
81
82 None
83 }
84
85 async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
86 let err = match writer.finish().await {
87 Ok(size) => return size,
88 Err(err) => err,
89 };
90
91 if cfg!(any(test, feature = "test")) {
92 panic!(
93 "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
94 self.region_id, self.file_id, err
95 );
96 } else {
97 warn!(
98 err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
99 self.region_id, self.file_id,
100 );
101 }
102
103 0
104 }
105
106 async fn do_finish_inverted_index(
108 &mut self,
109 puffin_writer: &mut SstPuffinWriter,
110 index_output: &mut IndexOutput,
111 ) -> bool {
112 let Some(mut indexer) = self.inverted_indexer.take() else {
113 return true;
114 };
115
116 let err = match indexer.finish(puffin_writer).await {
117 Ok((row_count, byte_count)) => {
118 self.fill_inverted_index_output(
119 &mut index_output.inverted_index,
120 row_count,
121 byte_count,
122 &indexer,
123 );
124 return true;
125 }
126 Err(err) => err,
127 };
128
129 if cfg!(any(test, feature = "test")) {
130 panic!(
131 "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
132 self.region_id, self.file_id, err
133 );
134 } else {
135 warn!(
136 err; "Failed to finish inverted index, region_id: {}, file_id: {}",
137 self.region_id, self.file_id,
138 );
139 }
140
141 false
142 }
143
144 async fn do_finish_fulltext_index(
145 &mut self,
146 puffin_writer: &mut SstPuffinWriter,
147 index_output: &mut IndexOutput,
148 ) -> bool {
149 let Some(mut indexer) = self.fulltext_indexer.take() else {
150 return true;
151 };
152
153 let err = match indexer.finish(puffin_writer).await {
154 Ok((row_count, byte_count)) => {
155 self.fill_fulltext_index_output(
156 &mut index_output.fulltext_index,
157 row_count,
158 byte_count,
159 &indexer,
160 );
161 return true;
162 }
163 Err(err) => err,
164 };
165
166 if cfg!(any(test, feature = "test")) {
167 panic!(
168 "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
169 self.region_id, self.file_id, err
170 );
171 } else {
172 warn!(
173 err; "Failed to finish full-text index, region_id: {}, file_id: {}",
174 self.region_id, self.file_id,
175 );
176 }
177
178 false
179 }
180
181 async fn do_finish_bloom_filter(
182 &mut self,
183 puffin_writer: &mut SstPuffinWriter,
184 index_output: &mut IndexOutput,
185 ) -> bool {
186 let Some(mut indexer) = self.bloom_filter_indexer.take() else {
187 return true;
188 };
189
190 let err = match indexer.finish(puffin_writer).await {
191 Ok((row_count, byte_count)) => {
192 self.fill_bloom_filter_output(
193 &mut index_output.bloom_filter,
194 row_count,
195 byte_count,
196 &indexer,
197 );
198 return true;
199 }
200 Err(err) => err,
201 };
202
203 if cfg!(any(test, feature = "test")) {
204 panic!(
205 "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
206 self.region_id, self.file_id, err
207 );
208 } else {
209 warn!(
210 err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
211 self.region_id, self.file_id,
212 );
213 }
214
215 false
216 }
217
218 fn fill_inverted_index_output(
219 &mut self,
220 output: &mut InvertedIndexOutput,
221 row_count: RowCount,
222 byte_count: ByteCount,
223 indexer: &InvertedIndexer,
224 ) {
225 debug!(
226 "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
227 self.region_id, self.file_id, byte_count, row_count
228 );
229
230 output.index_size = byte_count;
231 output.row_count = row_count;
232 output.columns = indexer.column_ids().collect();
233 }
234
235 fn fill_fulltext_index_output(
236 &mut self,
237 output: &mut FulltextIndexOutput,
238 row_count: RowCount,
239 byte_count: ByteCount,
240 indexer: &FulltextIndexer,
241 ) {
242 debug!(
243 "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
244 self.region_id, self.file_id, byte_count, row_count
245 );
246
247 output.index_size = byte_count;
248 output.row_count = row_count;
249 output.columns = indexer.column_ids().collect();
250 }
251
252 fn fill_bloom_filter_output(
253 &mut self,
254 output: &mut BloomFilterOutput,
255 row_count: RowCount,
256 byte_count: ByteCount,
257 indexer: &BloomFilterIndexer,
258 ) {
259 debug!(
260 "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
261 self.region_id, self.file_id, byte_count, row_count
262 );
263
264 output.index_size = byte_count;
265 output.row_count = row_count;
266 output.columns = indexer.column_ids().collect();
267 }
268}