1use common_telemetry::{debug, warn};
16use puffin::puffin_manager::{PuffinManager, PuffinWriter};
17use store_api::storage::ColumnId;
18
19use crate::sst::index::puffin_manager::SstPuffinWriter;
20use crate::sst::index::statistics::{ByteCount, RowCount};
21use crate::sst::index::{
22 BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
23};
24
25impl Indexer {
26 pub(crate) async fn do_finish(&mut self) -> IndexOutput {
27 let mut output = IndexOutput::default();
28
29 let Some(mut writer) = self.build_puffin_writer().await else {
30 self.do_abort().await;
31 return output;
32 };
33
34 let success = self
35 .do_finish_inverted_index(&mut writer, &mut output)
36 .await;
37 if !success {
38 self.do_abort().await;
39 return IndexOutput::default();
40 }
41
42 let success = self
43 .do_finish_fulltext_index(&mut writer, &mut output)
44 .await;
45 if !success {
46 self.do_abort().await;
47 return IndexOutput::default();
48 }
49
50 let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
51 if !success {
52 self.do_abort().await;
53 return IndexOutput::default();
54 }
55
56 output.file_size = self.do_finish_puffin_writer(writer).await;
57 output
58 }
59
60 async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
61 let puffin_manager = self.puffin_manager.take()?;
62
63 let err = match puffin_manager.writer(&self.file_id).await {
64 Ok(writer) => return Some(writer),
65 Err(err) => err,
66 };
67
68 if cfg!(any(test, feature = "test")) {
69 panic!(
70 "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
71 self.region_id, self.file_id, err
72 );
73 } else {
74 warn!(
75 err; "Failed to create puffin writer, region_id: {}, file_id: {}",
76 self.region_id, self.file_id,
77 );
78 }
79
80 None
81 }
82
83 async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
84 let err = match writer.finish().await {
85 Ok(size) => return size,
86 Err(err) => err,
87 };
88
89 if cfg!(any(test, feature = "test")) {
90 panic!(
91 "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
92 self.region_id, self.file_id, err
93 );
94 } else {
95 warn!(
96 err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
97 self.region_id, self.file_id,
98 );
99 }
100
101 0
102 }
103
104 async fn do_finish_inverted_index(
106 &mut self,
107 puffin_writer: &mut SstPuffinWriter,
108 index_output: &mut IndexOutput,
109 ) -> bool {
110 let Some(mut indexer) = self.inverted_indexer.take() else {
111 return true;
112 };
113
114 let column_ids = indexer.column_ids().collect();
115 let err = match indexer.finish(puffin_writer).await {
116 Ok((row_count, byte_count)) => {
117 self.fill_inverted_index_output(
118 &mut index_output.inverted_index,
119 row_count,
120 byte_count,
121 column_ids,
122 );
123 return true;
124 }
125 Err(err) => err,
126 };
127
128 if cfg!(any(test, feature = "test")) {
129 panic!(
130 "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
131 self.region_id, self.file_id, err
132 );
133 } else {
134 warn!(
135 err; "Failed to finish inverted index, region_id: {}, file_id: {}",
136 self.region_id, self.file_id,
137 );
138 }
139
140 false
141 }
142
143 async fn do_finish_fulltext_index(
144 &mut self,
145 puffin_writer: &mut SstPuffinWriter,
146 index_output: &mut IndexOutput,
147 ) -> bool {
148 let Some(mut indexer) = self.fulltext_indexer.take() else {
149 return true;
150 };
151
152 let column_ids = indexer.column_ids().collect();
153 let err = match indexer.finish(puffin_writer).await {
154 Ok((row_count, byte_count)) => {
155 self.fill_fulltext_index_output(
156 &mut index_output.fulltext_index,
157 row_count,
158 byte_count,
159 column_ids,
160 );
161 return true;
162 }
163 Err(err) => err,
164 };
165
166 if cfg!(any(test, feature = "test")) {
167 panic!(
168 "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
169 self.region_id, self.file_id, err
170 );
171 } else {
172 warn!(
173 err; "Failed to finish full-text index, region_id: {}, file_id: {}",
174 self.region_id, self.file_id,
175 );
176 }
177
178 false
179 }
180
181 async fn do_finish_bloom_filter(
182 &mut self,
183 puffin_writer: &mut SstPuffinWriter,
184 index_output: &mut IndexOutput,
185 ) -> bool {
186 let Some(mut indexer) = self.bloom_filter_indexer.take() else {
187 return true;
188 };
189
190 let column_ids = indexer.column_ids().collect();
191 let err = match indexer.finish(puffin_writer).await {
192 Ok((row_count, byte_count)) => {
193 self.fill_bloom_filter_output(
194 &mut index_output.bloom_filter,
195 row_count,
196 byte_count,
197 column_ids,
198 );
199 return true;
200 }
201 Err(err) => err,
202 };
203
204 if cfg!(any(test, feature = "test")) {
205 panic!(
206 "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
207 self.region_id, self.file_id, err
208 );
209 } else {
210 warn!(
211 err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
212 self.region_id, self.file_id,
213 );
214 }
215
216 false
217 }
218
219 fn fill_inverted_index_output(
220 &mut self,
221 output: &mut InvertedIndexOutput,
222 row_count: RowCount,
223 byte_count: ByteCount,
224 column_ids: Vec<ColumnId>,
225 ) {
226 debug!(
227 "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
228 self.region_id, self.file_id, byte_count, row_count, column_ids
229 );
230
231 output.index_size = byte_count;
232 output.row_count = row_count;
233 output.columns = column_ids;
234 }
235
236 fn fill_fulltext_index_output(
237 &mut self,
238 output: &mut FulltextIndexOutput,
239 row_count: RowCount,
240 byte_count: ByteCount,
241 column_ids: Vec<ColumnId>,
242 ) {
243 debug!(
244 "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
245 self.region_id, self.file_id, byte_count, row_count, column_ids
246 );
247
248 output.index_size = byte_count;
249 output.row_count = row_count;
250 output.columns = column_ids;
251 }
252
253 fn fill_bloom_filter_output(
254 &mut self,
255 output: &mut BloomFilterOutput,
256 row_count: RowCount,
257 byte_count: ByteCount,
258 column_ids: Vec<ColumnId>,
259 ) {
260 debug!(
261 "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
262 self.region_id, self.file_id, byte_count, row_count, column_ids
263 );
264
265 output.index_size = byte_count;
266 output.row_count = row_count;
267 output.columns = column_ids;
268 }
269}