1use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17use store_api::storage::ColumnId;
18
19use crate::sst::file::RegionFileId;
20use crate::sst::index::puffin_manager::SstPuffinWriter;
21use crate::sst::index::statistics::{ByteCount, RowCount};
22use crate::sst::index::{
23 BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
24};
25
26impl Indexer {
27 pub(crate) async fn do_finish(&mut self) -> IndexOutput {
28 let mut output = IndexOutput::default();
29
30 let Some(mut writer) = self.build_puffin_writer().await else {
31 self.do_abort().await;
32 return output;
33 };
34
35 let success = self
36 .do_finish_inverted_index(&mut writer, &mut output)
37 .await;
38 if !success {
39 self.do_abort().await;
40 return IndexOutput::default();
41 }
42
43 let success = self
44 .do_finish_fulltext_index(&mut writer, &mut output)
45 .await;
46 if !success {
47 self.do_abort().await;
48 return IndexOutput::default();
49 }
50
51 let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
52 if !success {
53 self.do_abort().await;
54 return IndexOutput::default();
55 }
56
57 output.file_size = self.do_finish_puffin_writer(writer).await;
58 output
59 }
60
61 async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
62 let puffin_manager = self.puffin_manager.take()?;
63
64 let err = match puffin_manager
65 .writer(&RegionFileId::new(self.region_id, self.file_id))
66 .await
67 {
68 Ok(writer) => return Some(writer),
69 Err(err) => err,
70 };
71
72 if cfg!(any(test, feature = "test")) {
73 panic!(
74 "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
75 self.region_id, self.file_id, err
76 );
77 } else {
78 warn!(
79 err; "Failed to create puffin writer, region_id: {}, file_id: {}",
80 self.region_id, self.file_id,
81 );
82 }
83
84 None
85 }
86
87 async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
88 let err = match writer.finish().await {
89 Ok(size) => return size,
90 Err(err) => err,
91 };
92
93 if cfg!(any(test, feature = "test")) {
94 panic!(
95 "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
96 self.region_id, self.file_id, err
97 );
98 } else {
99 warn!(
100 err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
101 self.region_id, self.file_id,
102 );
103 }
104
105 0
106 }
107
108 async fn do_finish_inverted_index(
110 &mut self,
111 puffin_writer: &mut SstPuffinWriter,
112 index_output: &mut IndexOutput,
113 ) -> bool {
114 let Some(mut indexer) = self.inverted_indexer.take() else {
115 return true;
116 };
117
118 let column_ids = indexer.column_ids().collect();
119 let err = match indexer.finish(puffin_writer).await {
120 Ok((row_count, byte_count)) => {
121 self.fill_inverted_index_output(
122 &mut index_output.inverted_index,
123 row_count,
124 byte_count,
125 column_ids,
126 );
127 return true;
128 }
129 Err(err) => err,
130 };
131
132 if cfg!(any(test, feature = "test")) {
133 panic!(
134 "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
135 self.region_id, self.file_id, err
136 );
137 } else {
138 warn!(
139 err; "Failed to finish inverted index, region_id: {}, file_id: {}",
140 self.region_id, self.file_id,
141 );
142 }
143
144 false
145 }
146
147 async fn do_finish_fulltext_index(
148 &mut self,
149 puffin_writer: &mut SstPuffinWriter,
150 index_output: &mut IndexOutput,
151 ) -> bool {
152 let Some(mut indexer) = self.fulltext_indexer.take() else {
153 return true;
154 };
155
156 let column_ids = indexer.column_ids().collect();
157 let err = match indexer.finish(puffin_writer).await {
158 Ok((row_count, byte_count)) => {
159 self.fill_fulltext_index_output(
160 &mut index_output.fulltext_index,
161 row_count,
162 byte_count,
163 column_ids,
164 );
165 return true;
166 }
167 Err(err) => err,
168 };
169
170 if cfg!(any(test, feature = "test")) {
171 panic!(
172 "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
173 self.region_id, self.file_id, err
174 );
175 } else {
176 warn!(
177 err; "Failed to finish full-text index, region_id: {}, file_id: {}",
178 self.region_id, self.file_id,
179 );
180 }
181
182 false
183 }
184
185 async fn do_finish_bloom_filter(
186 &mut self,
187 puffin_writer: &mut SstPuffinWriter,
188 index_output: &mut IndexOutput,
189 ) -> bool {
190 let Some(mut indexer) = self.bloom_filter_indexer.take() else {
191 return true;
192 };
193
194 let column_ids = indexer.column_ids().collect();
195 let err = match indexer.finish(puffin_writer).await {
196 Ok((row_count, byte_count)) => {
197 self.fill_bloom_filter_output(
198 &mut index_output.bloom_filter,
199 row_count,
200 byte_count,
201 column_ids,
202 );
203 return true;
204 }
205 Err(err) => err,
206 };
207
208 if cfg!(any(test, feature = "test")) {
209 panic!(
210 "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
211 self.region_id, self.file_id, err
212 );
213 } else {
214 warn!(
215 err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
216 self.region_id, self.file_id,
217 );
218 }
219
220 false
221 }
222
223 fn fill_inverted_index_output(
224 &mut self,
225 output: &mut InvertedIndexOutput,
226 row_count: RowCount,
227 byte_count: ByteCount,
228 column_ids: Vec<ColumnId>,
229 ) {
230 debug!(
231 "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
232 self.region_id, self.file_id, byte_count, row_count, column_ids
233 );
234
235 output.index_size = byte_count;
236 output.row_count = row_count;
237 output.columns = column_ids;
238 }
239
240 fn fill_fulltext_index_output(
241 &mut self,
242 output: &mut FulltextIndexOutput,
243 row_count: RowCount,
244 byte_count: ByteCount,
245 column_ids: Vec<ColumnId>,
246 ) {
247 debug!(
248 "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
249 self.region_id, self.file_id, byte_count, row_count, column_ids
250 );
251
252 output.index_size = byte_count;
253 output.row_count = row_count;
254 output.columns = column_ids;
255 }
256
257 fn fill_bloom_filter_output(
258 &mut self,
259 output: &mut BloomFilterOutput,
260 row_count: RowCount,
261 byte_count: ByteCount,
262 column_ids: Vec<ColumnId>,
263 ) {
264 debug!(
265 "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
266 self.region_id, self.file_id, byte_count, row_count, column_ids
267 );
268
269 output.index_size = byte_count;
270 output.row_count = row_count;
271 output.columns = column_ids;
272 }
273}