1use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17use store_api::storage::ColumnId;
18
19use crate::sst::file::{RegionFileId, RegionIndexId};
20#[cfg(feature = "vector_index")]
21use crate::sst::index::VectorIndexOutput;
22use crate::sst::index::puffin_manager::SstPuffinWriter;
23use crate::sst::index::statistics::{ByteCount, RowCount};
24use crate::sst::index::{
25 BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
26};
27
28impl Indexer {
29 pub(crate) async fn do_finish(&mut self) -> IndexOutput {
30 let mut output = IndexOutput::default();
31
32 let Some(mut writer) = self.build_puffin_writer().await else {
33 self.do_abort().await;
34 return output;
35 };
36
37 let success = self
38 .do_finish_inverted_index(&mut writer, &mut output)
39 .await;
40 if !success {
41 self.do_abort().await;
42 return IndexOutput::default();
43 }
44
45 let success = self
46 .do_finish_fulltext_index(&mut writer, &mut output)
47 .await;
48 if !success {
49 self.do_abort().await;
50 return IndexOutput::default();
51 }
52
53 let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
54 if !success {
55 self.do_abort().await;
56 return IndexOutput::default();
57 }
58
59 #[cfg(feature = "vector_index")]
60 {
61 let success = self.do_finish_vector_index(&mut writer, &mut output).await;
62 if !success {
63 self.do_abort().await;
64 return IndexOutput::default();
65 }
66 }
67
68 self.do_prune_intm_sst_dir().await;
69 output.file_size = self.do_finish_puffin_writer(writer).await;
70 output.version = self.index_version;
71 output
72 }
73
74 async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
75 let puffin_manager = self.puffin_manager.clone()?;
76
77 let err = match puffin_manager
78 .writer(&RegionIndexId::new(
79 RegionFileId::new(self.region_id, self.file_id),
80 self.index_version,
81 ))
82 .await
83 {
84 Ok(writer) => return Some(writer),
85 Err(err) => err,
86 };
87
88 if cfg!(any(test, feature = "test")) {
89 panic!(
90 "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
91 self.region_id, self.file_id, err
92 );
93 } else {
94 warn!(
95 err; "Failed to create puffin writer, region_id: {}, file_id: {}",
96 self.region_id, self.file_id,
97 );
98 }
99
100 None
101 }
102
103 async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
104 let err = match writer.finish().await {
105 Ok(size) => return size,
106 Err(err) => err,
107 };
108
109 if cfg!(any(test, feature = "test")) {
110 panic!(
111 "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
112 self.region_id, self.file_id, err
113 );
114 } else {
115 warn!(
116 err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
117 self.region_id, self.file_id,
118 );
119 }
120
121 0
122 }
123
124 async fn do_finish_inverted_index(
126 &mut self,
127 puffin_writer: &mut SstPuffinWriter,
128 index_output: &mut IndexOutput,
129 ) -> bool {
130 let Some(mut indexer) = self.inverted_indexer.take() else {
131 return true;
132 };
133
134 let column_ids = indexer.column_ids().collect();
135 let err = match indexer.finish(puffin_writer).await {
136 Ok((row_count, byte_count)) => {
137 self.fill_inverted_index_output(
138 &mut index_output.inverted_index,
139 row_count,
140 byte_count,
141 column_ids,
142 );
143 return true;
144 }
145 Err(err) => err,
146 };
147
148 if cfg!(any(test, feature = "test")) {
149 panic!(
150 "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
151 self.region_id, self.file_id, err
152 );
153 } else {
154 warn!(
155 err; "Failed to finish inverted index, region_id: {}, file_id: {}",
156 self.region_id, self.file_id,
157 );
158 }
159
160 false
161 }
162
163 async fn do_finish_fulltext_index(
164 &mut self,
165 puffin_writer: &mut SstPuffinWriter,
166 index_output: &mut IndexOutput,
167 ) -> bool {
168 let Some(mut indexer) = self.fulltext_indexer.take() else {
169 return true;
170 };
171
172 let column_ids = indexer.column_ids().collect();
173 let err = match indexer.finish(puffin_writer).await {
174 Ok((row_count, byte_count)) => {
175 self.fill_fulltext_index_output(
176 &mut index_output.fulltext_index,
177 row_count,
178 byte_count,
179 column_ids,
180 );
181 return true;
182 }
183 Err(err) => err,
184 };
185
186 if cfg!(any(test, feature = "test")) {
187 panic!(
188 "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
189 self.region_id, self.file_id, err
190 );
191 } else {
192 warn!(
193 err; "Failed to finish full-text index, region_id: {}, file_id: {}",
194 self.region_id, self.file_id,
195 );
196 }
197
198 false
199 }
200
201 async fn do_finish_bloom_filter(
202 &mut self,
203 puffin_writer: &mut SstPuffinWriter,
204 index_output: &mut IndexOutput,
205 ) -> bool {
206 let Some(mut indexer) = self.bloom_filter_indexer.take() else {
207 return true;
208 };
209
210 let column_ids = indexer.column_ids().collect();
211 let err = match indexer.finish(puffin_writer).await {
212 Ok((row_count, byte_count)) => {
213 self.fill_bloom_filter_output(
214 &mut index_output.bloom_filter,
215 row_count,
216 byte_count,
217 column_ids,
218 );
219 return true;
220 }
221 Err(err) => err,
222 };
223
224 if cfg!(any(test, feature = "test")) {
225 panic!(
226 "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
227 self.region_id, self.file_id, err
228 );
229 } else {
230 warn!(
231 err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
232 self.region_id, self.file_id,
233 );
234 }
235
236 false
237 }
238
239 fn fill_inverted_index_output(
240 &mut self,
241 output: &mut InvertedIndexOutput,
242 row_count: RowCount,
243 byte_count: ByteCount,
244 column_ids: Vec<ColumnId>,
245 ) {
246 debug!(
247 "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
248 self.region_id, self.file_id, byte_count, row_count, column_ids
249 );
250
251 output.index_size = byte_count;
252 output.row_count = row_count;
253 output.columns = column_ids;
254 }
255
256 fn fill_fulltext_index_output(
257 &mut self,
258 output: &mut FulltextIndexOutput,
259 row_count: RowCount,
260 byte_count: ByteCount,
261 column_ids: Vec<ColumnId>,
262 ) {
263 debug!(
264 "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
265 self.region_id, self.file_id, byte_count, row_count, column_ids
266 );
267
268 output.index_size = byte_count;
269 output.row_count = row_count;
270 output.columns = column_ids;
271 }
272
273 fn fill_bloom_filter_output(
274 &mut self,
275 output: &mut BloomFilterOutput,
276 row_count: RowCount,
277 byte_count: ByteCount,
278 column_ids: Vec<ColumnId>,
279 ) {
280 debug!(
281 "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
282 self.region_id, self.file_id, byte_count, row_count, column_ids
283 );
284
285 output.index_size = byte_count;
286 output.row_count = row_count;
287 output.columns = column_ids;
288 }
289
290 #[cfg(feature = "vector_index")]
291 async fn do_finish_vector_index(
292 &mut self,
293 puffin_writer: &mut SstPuffinWriter,
294 index_output: &mut IndexOutput,
295 ) -> bool {
296 let Some(mut indexer) = self.vector_indexer.take() else {
297 return true;
298 };
299
300 let column_ids = indexer.column_ids().collect();
301 let err = match indexer.finish(puffin_writer).await {
302 Ok((row_count, byte_count)) => {
303 self.fill_vector_index_output(
304 &mut index_output.vector_index,
305 row_count,
306 byte_count,
307 column_ids,
308 );
309 return true;
310 }
311 Err(err) => err,
312 };
313
314 if cfg!(any(test, feature = "test")) {
315 panic!(
316 "Failed to finish vector index, region_id: {}, file_id: {}, err: {:?}",
317 self.region_id, self.file_id, err
318 );
319 } else {
320 warn!(
321 err; "Failed to finish vector index, region_id: {}, file_id: {}",
322 self.region_id, self.file_id,
323 );
324 }
325
326 false
327 }
328
329 #[cfg(feature = "vector_index")]
330 fn fill_vector_index_output(
331 &mut self,
332 output: &mut VectorIndexOutput,
333 row_count: RowCount,
334 byte_count: ByteCount,
335 column_ids: Vec<ColumnId>,
336 ) {
337 debug!(
338 "Vector index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
339 self.region_id, self.file_id, byte_count, row_count, column_ids
340 );
341
342 output.index_size = byte_count;
343 output.row_count = row_count;
344 output.columns = column_ids;
345 }
346
347 pub(crate) async fn do_prune_intm_sst_dir(&mut self) {
348 if let Some(manager) = self.intermediate_manager.take()
349 && let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await
350 {
351 warn!(e; "Failed to prune intermediate SST directory, region_id: {}, file_id: {}", self.region_id, self.file_id);
352 }
353 }
354}