1use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17use store_api::storage::ColumnId;
18
19use crate::sst::file::{RegionFileId, RegionIndexId};
20use crate::sst::index::puffin_manager::SstPuffinWriter;
21use crate::sst::index::statistics::{ByteCount, RowCount};
22use crate::sst::index::{
23 BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
24};
25
26impl Indexer {
27 pub(crate) async fn do_finish(&mut self) -> IndexOutput {
28 let mut output = IndexOutput::default();
29
30 let Some(mut writer) = self.build_puffin_writer().await else {
31 self.do_abort().await;
32 return output;
33 };
34
35 let success = self
36 .do_finish_inverted_index(&mut writer, &mut output)
37 .await;
38 if !success {
39 self.do_abort().await;
40 return IndexOutput::default();
41 }
42
43 let success = self
44 .do_finish_fulltext_index(&mut writer, &mut output)
45 .await;
46 if !success {
47 self.do_abort().await;
48 return IndexOutput::default();
49 }
50
51 let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
52 if !success {
53 self.do_abort().await;
54 return IndexOutput::default();
55 }
56
57 self.do_prune_intm_sst_dir().await;
58 output.file_size = self.do_finish_puffin_writer(writer).await;
59 output.version = self.index_version;
60 output
61 }
62
63 async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
64 let puffin_manager = self.puffin_manager.clone()?;
65
66 let err = match puffin_manager
67 .writer(&RegionIndexId::new(
68 RegionFileId::new(self.region_id, self.file_id),
69 self.index_version,
70 ))
71 .await
72 {
73 Ok(writer) => return Some(writer),
74 Err(err) => err,
75 };
76
77 if cfg!(any(test, feature = "test")) {
78 panic!(
79 "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
80 self.region_id, self.file_id, err
81 );
82 } else {
83 warn!(
84 err; "Failed to create puffin writer, region_id: {}, file_id: {}",
85 self.region_id, self.file_id,
86 );
87 }
88
89 None
90 }
91
92 async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
93 let err = match writer.finish().await {
94 Ok(size) => return size,
95 Err(err) => err,
96 };
97
98 if cfg!(any(test, feature = "test")) {
99 panic!(
100 "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
101 self.region_id, self.file_id, err
102 );
103 } else {
104 warn!(
105 err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
106 self.region_id, self.file_id,
107 );
108 }
109
110 0
111 }
112
113 async fn do_finish_inverted_index(
115 &mut self,
116 puffin_writer: &mut SstPuffinWriter,
117 index_output: &mut IndexOutput,
118 ) -> bool {
119 let Some(mut indexer) = self.inverted_indexer.take() else {
120 return true;
121 };
122
123 let column_ids = indexer.column_ids().collect();
124 let err = match indexer.finish(puffin_writer).await {
125 Ok((row_count, byte_count)) => {
126 self.fill_inverted_index_output(
127 &mut index_output.inverted_index,
128 row_count,
129 byte_count,
130 column_ids,
131 );
132 return true;
133 }
134 Err(err) => err,
135 };
136
137 if cfg!(any(test, feature = "test")) {
138 panic!(
139 "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
140 self.region_id, self.file_id, err
141 );
142 } else {
143 warn!(
144 err; "Failed to finish inverted index, region_id: {}, file_id: {}",
145 self.region_id, self.file_id,
146 );
147 }
148
149 false
150 }
151
152 async fn do_finish_fulltext_index(
153 &mut self,
154 puffin_writer: &mut SstPuffinWriter,
155 index_output: &mut IndexOutput,
156 ) -> bool {
157 let Some(mut indexer) = self.fulltext_indexer.take() else {
158 return true;
159 };
160
161 let column_ids = indexer.column_ids().collect();
162 let err = match indexer.finish(puffin_writer).await {
163 Ok((row_count, byte_count)) => {
164 self.fill_fulltext_index_output(
165 &mut index_output.fulltext_index,
166 row_count,
167 byte_count,
168 column_ids,
169 );
170 return true;
171 }
172 Err(err) => err,
173 };
174
175 if cfg!(any(test, feature = "test")) {
176 panic!(
177 "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
178 self.region_id, self.file_id, err
179 );
180 } else {
181 warn!(
182 err; "Failed to finish full-text index, region_id: {}, file_id: {}",
183 self.region_id, self.file_id,
184 );
185 }
186
187 false
188 }
189
190 async fn do_finish_bloom_filter(
191 &mut self,
192 puffin_writer: &mut SstPuffinWriter,
193 index_output: &mut IndexOutput,
194 ) -> bool {
195 let Some(mut indexer) = self.bloom_filter_indexer.take() else {
196 return true;
197 };
198
199 let column_ids = indexer.column_ids().collect();
200 let err = match indexer.finish(puffin_writer).await {
201 Ok((row_count, byte_count)) => {
202 self.fill_bloom_filter_output(
203 &mut index_output.bloom_filter,
204 row_count,
205 byte_count,
206 column_ids,
207 );
208 return true;
209 }
210 Err(err) => err,
211 };
212
213 if cfg!(any(test, feature = "test")) {
214 panic!(
215 "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
216 self.region_id, self.file_id, err
217 );
218 } else {
219 warn!(
220 err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
221 self.region_id, self.file_id,
222 );
223 }
224
225 false
226 }
227
228 fn fill_inverted_index_output(
229 &mut self,
230 output: &mut InvertedIndexOutput,
231 row_count: RowCount,
232 byte_count: ByteCount,
233 column_ids: Vec<ColumnId>,
234 ) {
235 debug!(
236 "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
237 self.region_id, self.file_id, byte_count, row_count, column_ids
238 );
239
240 output.index_size = byte_count;
241 output.row_count = row_count;
242 output.columns = column_ids;
243 }
244
245 fn fill_fulltext_index_output(
246 &mut self,
247 output: &mut FulltextIndexOutput,
248 row_count: RowCount,
249 byte_count: ByteCount,
250 column_ids: Vec<ColumnId>,
251 ) {
252 debug!(
253 "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
254 self.region_id, self.file_id, byte_count, row_count, column_ids
255 );
256
257 output.index_size = byte_count;
258 output.row_count = row_count;
259 output.columns = column_ids;
260 }
261
262 fn fill_bloom_filter_output(
263 &mut self,
264 output: &mut BloomFilterOutput,
265 row_count: RowCount,
266 byte_count: ByteCount,
267 column_ids: Vec<ColumnId>,
268 ) {
269 debug!(
270 "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
271 self.region_id, self.file_id, byte_count, row_count, column_ids
272 );
273
274 output.index_size = byte_count;
275 output.row_count = row_count;
276 output.columns = column_ids;
277 }
278
279 pub(crate) async fn do_prune_intm_sst_dir(&mut self) {
280 if let Some(manager) = self.intermediate_manager.take()
281 && let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await
282 {
283 warn!(e; "Failed to prune intermediate SST directory, region_id: {}, file_id: {}", self.region_id, self.file_id);
284 }
285 }
286}