mito2/sst/index/indexer/
update.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::warn;
16use datatypes::arrow::record_batch::RecordBatch;
17
18use crate::read::Batch;
19use crate::sst::index::Indexer;
20
21impl Indexer {
22    pub(crate) async fn do_update(&mut self, batch: &mut Batch) {
23        if batch.is_empty() {
24            return;
25        }
26
27        if !self.do_update_inverted_index(batch).await {
28            self.do_abort().await;
29        }
30        if !self.do_update_fulltext_index(batch).await {
31            self.do_abort().await;
32        }
33        if !self.do_update_bloom_filter(batch).await {
34            self.do_abort().await;
35        }
36        #[cfg(feature = "vector_index")]
37        if !self.do_update_vector_index(batch).await {
38            self.do_abort().await;
39        }
40    }
41
42    /// Returns false if the update failed.
43    async fn do_update_inverted_index(&mut self, batch: &mut Batch) -> bool {
44        let Some(creator) = self.inverted_indexer.as_mut() else {
45            return true;
46        };
47
48        let Err(err) = creator.update(batch).await else {
49            return true;
50        };
51
52        if cfg!(any(test, feature = "test")) {
53            panic!(
54                "Failed to update inverted index, region_id: {}, file_id: {}, err: {:?}",
55                self.region_id, self.file_id, err
56            );
57        } else {
58            warn!(
59                err; "Failed to update inverted index, region_id: {}, file_id: {}",
60                self.region_id, self.file_id,
61            );
62        }
63
64        false
65    }
66
67    /// Returns false if the update failed.
68    async fn do_update_fulltext_index(&mut self, batch: &mut Batch) -> bool {
69        let Some(creator) = self.fulltext_indexer.as_mut() else {
70            return true;
71        };
72
73        let Err(err) = creator.update(batch).await else {
74            return true;
75        };
76
77        if cfg!(any(test, feature = "test")) {
78            panic!(
79                "Failed to update full-text index, region_id: {}, file_id: {}, err: {:?}",
80                self.region_id, self.file_id, err
81            );
82        } else {
83            warn!(
84                err; "Failed to update full-text index, region_id: {}, file_id: {}",
85                self.region_id, self.file_id,
86            );
87        }
88
89        false
90    }
91
92    /// Returns false if the update failed.
93    async fn do_update_bloom_filter(&mut self, batch: &mut Batch) -> bool {
94        let Some(creator) = self.bloom_filter_indexer.as_mut() else {
95            return true;
96        };
97
98        let Err(err) = creator.update(batch).await else {
99            return true;
100        };
101
102        if cfg!(any(test, feature = "test")) {
103            panic!(
104                "Failed to update bloom filter, region_id: {}, file_id: {}, err: {:?}",
105                self.region_id, self.file_id, err
106            );
107        } else {
108            warn!(
109                err; "Failed to update bloom filter, region_id: {}, file_id: {}",
110                self.region_id, self.file_id,
111            );
112        }
113
114        false
115    }
116
117    /// Returns false if the update failed.
118    #[cfg(feature = "vector_index")]
119    async fn do_update_vector_index(&mut self, batch: &mut Batch) -> bool {
120        let Some(creator) = self.vector_indexer.as_mut() else {
121            return true;
122        };
123
124        let Err(err) = creator.update(batch).await else {
125            return true;
126        };
127
128        if cfg!(any(test, feature = "test")) {
129            panic!(
130                "Failed to update vector index, region_id: {}, file_id: {}, err: {:?}",
131                self.region_id, self.file_id, err
132            );
133        } else {
134            warn!(
135                err; "Failed to update vector index, region_id: {}, file_id: {}",
136                self.region_id, self.file_id,
137            );
138        }
139
140        false
141    }
142
143    pub(crate) async fn do_update_flat(&mut self, batch: &RecordBatch) {
144        if batch.num_rows() == 0 {
145            return;
146        }
147
148        if !self.do_update_flat_inverted_index(batch).await {
149            self.do_abort().await;
150        }
151        if !self.do_update_flat_fulltext_index(batch).await {
152            self.do_abort().await;
153        }
154        if !self.do_update_flat_bloom_filter(batch).await {
155            self.do_abort().await;
156        }
157        #[cfg(feature = "vector_index")]
158        if !self.do_update_flat_vector_index(batch).await {
159            self.do_abort().await;
160        }
161    }
162
163    /// Returns false if the update failed.
164    async fn do_update_flat_inverted_index(&mut self, batch: &RecordBatch) -> bool {
165        let Some(creator) = self.inverted_indexer.as_mut() else {
166            return true;
167        };
168
169        let Err(err) = creator.update_flat(batch).await else {
170            return true;
171        };
172
173        if cfg!(any(test, feature = "test")) {
174            panic!(
175                "Failed to update inverted index with flat format, region_id: {}, file_id: {}, err: {:?}",
176                self.region_id, self.file_id, err
177            );
178        } else {
179            warn!(
180                err; "Failed to update inverted index with flat format, region_id: {}, file_id: {}",
181                self.region_id, self.file_id,
182            );
183        }
184
185        false
186    }
187
188    /// Returns false if the update failed.
189    async fn do_update_flat_fulltext_index(&mut self, batch: &RecordBatch) -> bool {
190        let Some(creator) = self.fulltext_indexer.as_mut() else {
191            return true;
192        };
193
194        let Err(err) = creator.update_flat(batch).await else {
195            return true;
196        };
197
198        if cfg!(any(test, feature = "test")) {
199            panic!(
200                "Failed to update full-text index with flat format, region_id: {}, file_id: {}, err: {:?}",
201                self.region_id, self.file_id, err
202            );
203        } else {
204            warn!(
205                err; "Failed to update full-text index with flat format, region_id: {}, file_id: {}",
206                self.region_id, self.file_id,
207            );
208        }
209
210        false
211    }
212
213    /// Returns false if the update failed.
214    async fn do_update_flat_bloom_filter(&mut self, batch: &RecordBatch) -> bool {
215        let Some(creator) = self.bloom_filter_indexer.as_mut() else {
216            return true;
217        };
218
219        let Err(err) = creator.update_flat(batch).await else {
220            return true;
221        };
222
223        if cfg!(any(test, feature = "test")) {
224            panic!(
225                "Failed to update bloom filter with flat format, region_id: {}, file_id: {}, err: {:?}",
226                self.region_id, self.file_id, err
227            );
228        } else {
229            warn!(
230                err; "Failed to update bloom filter with flat format, region_id: {}, file_id: {}",
231                self.region_id, self.file_id,
232            );
233        }
234
235        false
236    }
237
238    /// Returns false if the update failed.
239    #[cfg(feature = "vector_index")]
240    async fn do_update_flat_vector_index(&mut self, batch: &RecordBatch) -> bool {
241        let Some(creator) = self.vector_indexer.as_mut() else {
242            return true;
243        };
244
245        let Err(err) = creator.update_flat(batch).await else {
246            return true;
247        };
248
249        if cfg!(any(test, feature = "test")) {
250            panic!(
251                "Failed to update vector index with flat format, region_id: {}, file_id: {}, err: {:?}",
252                self.region_id, self.file_id, err
253            );
254        } else {
255            warn!(
256                err; "Failed to update vector index with flat format, region_id: {}, file_id: {}",
257                self.region_id, self.file_id,
258            );
259        }
260
261        false
262    }
263}