1use common_telemetry::warn;
16use datatypes::arrow::record_batch::RecordBatch;
17
18use crate::read::Batch;
19use crate::sst::index::Indexer;
20
21impl Indexer {
22 pub(crate) async fn do_update(&mut self, batch: &mut Batch) {
23 if batch.is_empty() {
24 return;
25 }
26
27 if !self.do_update_inverted_index(batch).await {
28 self.do_abort().await;
29 }
30 if !self.do_update_fulltext_index(batch).await {
31 self.do_abort().await;
32 }
33 if !self.do_update_bloom_filter(batch).await {
34 self.do_abort().await;
35 }
36 #[cfg(feature = "vector_index")]
37 if !self.do_update_vector_index(batch).await {
38 self.do_abort().await;
39 }
40 }
41
42 async fn do_update_inverted_index(&mut self, batch: &mut Batch) -> bool {
44 let Some(creator) = self.inverted_indexer.as_mut() else {
45 return true;
46 };
47
48 let Err(err) = creator.update(batch).await else {
49 return true;
50 };
51
52 if cfg!(any(test, feature = "test")) {
53 panic!(
54 "Failed to update inverted index, region_id: {}, file_id: {}, err: {:?}",
55 self.region_id, self.file_id, err
56 );
57 } else {
58 warn!(
59 err; "Failed to update inverted index, region_id: {}, file_id: {}",
60 self.region_id, self.file_id,
61 );
62 }
63
64 false
65 }
66
67 async fn do_update_fulltext_index(&mut self, batch: &mut Batch) -> bool {
69 let Some(creator) = self.fulltext_indexer.as_mut() else {
70 return true;
71 };
72
73 let Err(err) = creator.update(batch).await else {
74 return true;
75 };
76
77 if cfg!(any(test, feature = "test")) {
78 panic!(
79 "Failed to update full-text index, region_id: {}, file_id: {}, err: {:?}",
80 self.region_id, self.file_id, err
81 );
82 } else {
83 warn!(
84 err; "Failed to update full-text index, region_id: {}, file_id: {}",
85 self.region_id, self.file_id,
86 );
87 }
88
89 false
90 }
91
92 async fn do_update_bloom_filter(&mut self, batch: &mut Batch) -> bool {
94 let Some(creator) = self.bloom_filter_indexer.as_mut() else {
95 return true;
96 };
97
98 let Err(err) = creator.update(batch).await else {
99 return true;
100 };
101
102 if cfg!(any(test, feature = "test")) {
103 panic!(
104 "Failed to update bloom filter, region_id: {}, file_id: {}, err: {:?}",
105 self.region_id, self.file_id, err
106 );
107 } else {
108 warn!(
109 err; "Failed to update bloom filter, region_id: {}, file_id: {}",
110 self.region_id, self.file_id,
111 );
112 }
113
114 false
115 }
116
117 #[cfg(feature = "vector_index")]
119 async fn do_update_vector_index(&mut self, batch: &mut Batch) -> bool {
120 let Some(creator) = self.vector_indexer.as_mut() else {
121 return true;
122 };
123
124 let Err(err) = creator.update(batch).await else {
125 return true;
126 };
127
128 if cfg!(any(test, feature = "test")) {
129 panic!(
130 "Failed to update vector index, region_id: {}, file_id: {}, err: {:?}",
131 self.region_id, self.file_id, err
132 );
133 } else {
134 warn!(
135 err; "Failed to update vector index, region_id: {}, file_id: {}",
136 self.region_id, self.file_id,
137 );
138 }
139
140 false
141 }
142
143 pub(crate) async fn do_update_flat(&mut self, batch: &RecordBatch) {
144 if batch.num_rows() == 0 {
145 return;
146 }
147
148 if !self.do_update_flat_inverted_index(batch).await {
149 self.do_abort().await;
150 }
151 if !self.do_update_flat_fulltext_index(batch).await {
152 self.do_abort().await;
153 }
154 if !self.do_update_flat_bloom_filter(batch).await {
155 self.do_abort().await;
156 }
157 #[cfg(feature = "vector_index")]
158 if !self.do_update_flat_vector_index(batch).await {
159 self.do_abort().await;
160 }
161 }
162
163 async fn do_update_flat_inverted_index(&mut self, batch: &RecordBatch) -> bool {
165 let Some(creator) = self.inverted_indexer.as_mut() else {
166 return true;
167 };
168
169 let Err(err) = creator.update_flat(batch).await else {
170 return true;
171 };
172
173 if cfg!(any(test, feature = "test")) {
174 panic!(
175 "Failed to update inverted index with flat format, region_id: {}, file_id: {}, err: {:?}",
176 self.region_id, self.file_id, err
177 );
178 } else {
179 warn!(
180 err; "Failed to update inverted index with flat format, region_id: {}, file_id: {}",
181 self.region_id, self.file_id,
182 );
183 }
184
185 false
186 }
187
188 async fn do_update_flat_fulltext_index(&mut self, batch: &RecordBatch) -> bool {
190 let Some(creator) = self.fulltext_indexer.as_mut() else {
191 return true;
192 };
193
194 let Err(err) = creator.update_flat(batch).await else {
195 return true;
196 };
197
198 if cfg!(any(test, feature = "test")) {
199 panic!(
200 "Failed to update full-text index with flat format, region_id: {}, file_id: {}, err: {:?}",
201 self.region_id, self.file_id, err
202 );
203 } else {
204 warn!(
205 err; "Failed to update full-text index with flat format, region_id: {}, file_id: {}",
206 self.region_id, self.file_id,
207 );
208 }
209
210 false
211 }
212
213 async fn do_update_flat_bloom_filter(&mut self, batch: &RecordBatch) -> bool {
215 let Some(creator) = self.bloom_filter_indexer.as_mut() else {
216 return true;
217 };
218
219 let Err(err) = creator.update_flat(batch).await else {
220 return true;
221 };
222
223 if cfg!(any(test, feature = "test")) {
224 panic!(
225 "Failed to update bloom filter with flat format, region_id: {}, file_id: {}, err: {:?}",
226 self.region_id, self.file_id, err
227 );
228 } else {
229 warn!(
230 err; "Failed to update bloom filter with flat format, region_id: {}, file_id: {}",
231 self.region_id, self.file_id,
232 );
233 }
234
235 false
236 }
237
238 #[cfg(feature = "vector_index")]
240 async fn do_update_flat_vector_index(&mut self, batch: &RecordBatch) -> bool {
241 let Some(creator) = self.vector_indexer.as_mut() else {
242 return true;
243 };
244
245 let Err(err) = creator.update_flat(batch).await else {
246 return true;
247 };
248
249 if cfg!(any(test, feature = "test")) {
250 panic!(
251 "Failed to update vector index with flat format, region_id: {}, file_id: {}, err: {:?}",
252 self.region_id, self.file_id, err
253 );
254 } else {
255 warn!(
256 err; "Failed to update vector index with flat format, region_id: {}, file_id: {}",
257 self.region_id, self.file_id,
258 );
259 }
260
261 false
262 }
263}