1use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17use store_api::storage::ColumnId;
18
19use crate::sst::file::RegionFileId;
20use crate::sst::index::puffin_manager::SstPuffinWriter;
21use crate::sst::index::statistics::{ByteCount, RowCount};
22use crate::sst::index::{
23 BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
24};
25
26impl Indexer {
27 pub(crate) async fn do_finish(&mut self) -> IndexOutput {
28 let mut output = IndexOutput::default();
29
30 let Some(mut writer) = self.build_puffin_writer().await else {
31 self.do_abort().await;
32 return output;
33 };
34
35 let success = self
36 .do_finish_inverted_index(&mut writer, &mut output)
37 .await;
38 if !success {
39 self.do_abort().await;
40 return IndexOutput::default();
41 }
42
43 let success = self
44 .do_finish_fulltext_index(&mut writer, &mut output)
45 .await;
46 if !success {
47 self.do_abort().await;
48 return IndexOutput::default();
49 }
50
51 let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
52 if !success {
53 self.do_abort().await;
54 return IndexOutput::default();
55 }
56
57 self.do_prune_intm_sst_dir().await;
58 output.file_size = self.do_finish_puffin_writer(writer).await;
59 output
60 }
61
62 async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
63 let puffin_manager = self.puffin_manager.take()?;
64
65 let err = match puffin_manager
66 .writer(&RegionFileId::new(self.region_id, self.file_id))
67 .await
68 {
69 Ok(writer) => return Some(writer),
70 Err(err) => err,
71 };
72
73 if cfg!(any(test, feature = "test")) {
74 panic!(
75 "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
76 self.region_id, self.file_id, err
77 );
78 } else {
79 warn!(
80 err; "Failed to create puffin writer, region_id: {}, file_id: {}",
81 self.region_id, self.file_id,
82 );
83 }
84
85 None
86 }
87
88 async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
89 let err = match writer.finish().await {
90 Ok(size) => return size,
91 Err(err) => err,
92 };
93
94 if cfg!(any(test, feature = "test")) {
95 panic!(
96 "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
97 self.region_id, self.file_id, err
98 );
99 } else {
100 warn!(
101 err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
102 self.region_id, self.file_id,
103 );
104 }
105
106 0
107 }
108
109 async fn do_finish_inverted_index(
111 &mut self,
112 puffin_writer: &mut SstPuffinWriter,
113 index_output: &mut IndexOutput,
114 ) -> bool {
115 let Some(mut indexer) = self.inverted_indexer.take() else {
116 return true;
117 };
118
119 let column_ids = indexer.column_ids().collect();
120 let err = match indexer.finish(puffin_writer).await {
121 Ok((row_count, byte_count)) => {
122 self.fill_inverted_index_output(
123 &mut index_output.inverted_index,
124 row_count,
125 byte_count,
126 column_ids,
127 );
128 return true;
129 }
130 Err(err) => err,
131 };
132
133 if cfg!(any(test, feature = "test")) {
134 panic!(
135 "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
136 self.region_id, self.file_id, err
137 );
138 } else {
139 warn!(
140 err; "Failed to finish inverted index, region_id: {}, file_id: {}",
141 self.region_id, self.file_id,
142 );
143 }
144
145 false
146 }
147
148 async fn do_finish_fulltext_index(
149 &mut self,
150 puffin_writer: &mut SstPuffinWriter,
151 index_output: &mut IndexOutput,
152 ) -> bool {
153 let Some(mut indexer) = self.fulltext_indexer.take() else {
154 return true;
155 };
156
157 let column_ids = indexer.column_ids().collect();
158 let err = match indexer.finish(puffin_writer).await {
159 Ok((row_count, byte_count)) => {
160 self.fill_fulltext_index_output(
161 &mut index_output.fulltext_index,
162 row_count,
163 byte_count,
164 column_ids,
165 );
166 return true;
167 }
168 Err(err) => err,
169 };
170
171 if cfg!(any(test, feature = "test")) {
172 panic!(
173 "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
174 self.region_id, self.file_id, err
175 );
176 } else {
177 warn!(
178 err; "Failed to finish full-text index, region_id: {}, file_id: {}",
179 self.region_id, self.file_id,
180 );
181 }
182
183 false
184 }
185
186 async fn do_finish_bloom_filter(
187 &mut self,
188 puffin_writer: &mut SstPuffinWriter,
189 index_output: &mut IndexOutput,
190 ) -> bool {
191 let Some(mut indexer) = self.bloom_filter_indexer.take() else {
192 return true;
193 };
194
195 let column_ids = indexer.column_ids().collect();
196 let err = match indexer.finish(puffin_writer).await {
197 Ok((row_count, byte_count)) => {
198 self.fill_bloom_filter_output(
199 &mut index_output.bloom_filter,
200 row_count,
201 byte_count,
202 column_ids,
203 );
204 return true;
205 }
206 Err(err) => err,
207 };
208
209 if cfg!(any(test, feature = "test")) {
210 panic!(
211 "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
212 self.region_id, self.file_id, err
213 );
214 } else {
215 warn!(
216 err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
217 self.region_id, self.file_id,
218 );
219 }
220
221 false
222 }
223
224 fn fill_inverted_index_output(
225 &mut self,
226 output: &mut InvertedIndexOutput,
227 row_count: RowCount,
228 byte_count: ByteCount,
229 column_ids: Vec<ColumnId>,
230 ) {
231 debug!(
232 "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
233 self.region_id, self.file_id, byte_count, row_count, column_ids
234 );
235
236 output.index_size = byte_count;
237 output.row_count = row_count;
238 output.columns = column_ids;
239 }
240
241 fn fill_fulltext_index_output(
242 &mut self,
243 output: &mut FulltextIndexOutput,
244 row_count: RowCount,
245 byte_count: ByteCount,
246 column_ids: Vec<ColumnId>,
247 ) {
248 debug!(
249 "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
250 self.region_id, self.file_id, byte_count, row_count, column_ids
251 );
252
253 output.index_size = byte_count;
254 output.row_count = row_count;
255 output.columns = column_ids;
256 }
257
258 fn fill_bloom_filter_output(
259 &mut self,
260 output: &mut BloomFilterOutput,
261 row_count: RowCount,
262 byte_count: ByteCount,
263 column_ids: Vec<ColumnId>,
264 ) {
265 debug!(
266 "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
267 self.region_id, self.file_id, byte_count, row_count, column_ids
268 );
269
270 output.index_size = byte_count;
271 output.row_count = row_count;
272 output.columns = column_ids;
273 }
274
275 pub(crate) async fn do_prune_intm_sst_dir(&mut self) {
276 if let Some(manager) = self.intermediate_manager.take() {
277 if let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await {
278 warn!(e; "Failed to prune intermediate SST directory, region_id: {}, file_id: {}", self.region_id, self.file_id);
279 }
280 }
281 }
282}