mito2/sst/index/indexer/
update.rs1use common_telemetry::warn;
16use datatypes::arrow::record_batch::RecordBatch;
17
18use crate::read::Batch;
19use crate::sst::index::Indexer;
20
21impl Indexer {
22 pub(crate) async fn do_update(&mut self, batch: &mut Batch) {
23 if batch.is_empty() {
24 return;
25 }
26
27 if !self.do_update_inverted_index(batch).await {
28 self.do_abort().await;
29 }
30 if !self.do_update_fulltext_index(batch).await {
31 self.do_abort().await;
32 }
33 if !self.do_update_bloom_filter(batch).await {
34 self.do_abort().await;
35 }
36 }
37
38 async fn do_update_inverted_index(&mut self, batch: &mut Batch) -> bool {
40 let Some(creator) = self.inverted_indexer.as_mut() else {
41 return true;
42 };
43
44 let Err(err) = creator.update(batch).await else {
45 return true;
46 };
47
48 if cfg!(any(test, feature = "test")) {
49 panic!(
50 "Failed to update inverted index, region_id: {}, file_id: {}, err: {:?}",
51 self.region_id, self.file_id, err
52 );
53 } else {
54 warn!(
55 err; "Failed to update inverted index, region_id: {}, file_id: {}",
56 self.region_id, self.file_id,
57 );
58 }
59
60 false
61 }
62
63 async fn do_update_fulltext_index(&mut self, batch: &mut Batch) -> bool {
65 let Some(creator) = self.fulltext_indexer.as_mut() else {
66 return true;
67 };
68
69 let Err(err) = creator.update(batch).await else {
70 return true;
71 };
72
73 if cfg!(any(test, feature = "test")) {
74 panic!(
75 "Failed to update full-text index, region_id: {}, file_id: {}, err: {:?}",
76 self.region_id, self.file_id, err
77 );
78 } else {
79 warn!(
80 err; "Failed to update full-text index, region_id: {}, file_id: {}",
81 self.region_id, self.file_id,
82 );
83 }
84
85 false
86 }
87
88 async fn do_update_bloom_filter(&mut self, batch: &mut Batch) -> bool {
90 let Some(creator) = self.bloom_filter_indexer.as_mut() else {
91 return true;
92 };
93
94 let Err(err) = creator.update(batch).await else {
95 return true;
96 };
97
98 if cfg!(any(test, feature = "test")) {
99 panic!(
100 "Failed to update bloom filter, region_id: {}, file_id: {}, err: {:?}",
101 self.region_id, self.file_id, err
102 );
103 } else {
104 warn!(
105 err; "Failed to update bloom filter, region_id: {}, file_id: {}",
106 self.region_id, self.file_id,
107 );
108 }
109
110 false
111 }
112
113 pub(crate) async fn do_update_flat(&mut self, batch: &RecordBatch) {
114 if batch.num_rows() == 0 {
115 return;
116 }
117
118 if !self.do_update_flat_inverted_index(batch).await {
119 self.do_abort().await;
120 }
121 if !self.do_update_flat_fulltext_index(batch).await {
122 self.do_abort().await;
123 }
124 if !self.do_update_flat_bloom_filter(batch).await {
125 self.do_abort().await;
126 }
127 }
128
129 async fn do_update_flat_inverted_index(&mut self, batch: &RecordBatch) -> bool {
131 let Some(creator) = self.inverted_indexer.as_mut() else {
132 return true;
133 };
134
135 let Err(err) = creator.update_flat(batch).await else {
136 return true;
137 };
138
139 if cfg!(any(test, feature = "test")) {
140 panic!(
141 "Failed to update inverted index with flat format, region_id: {}, file_id: {}, err: {:?}",
142 self.region_id, self.file_id, err
143 );
144 } else {
145 warn!(
146 err; "Failed to update inverted index with flat format, region_id: {}, file_id: {}",
147 self.region_id, self.file_id,
148 );
149 }
150
151 false
152 }
153
154 async fn do_update_flat_fulltext_index(&mut self, batch: &RecordBatch) -> bool {
156 let Some(creator) = self.fulltext_indexer.as_mut() else {
157 return true;
158 };
159
160 let Err(err) = creator.update_flat(batch).await else {
161 return true;
162 };
163
164 if cfg!(any(test, feature = "test")) {
165 panic!(
166 "Failed to update full-text index with flat format, region_id: {}, file_id: {}, err: {:?}",
167 self.region_id, self.file_id, err
168 );
169 } else {
170 warn!(
171 err; "Failed to update full-text index with flat format, region_id: {}, file_id: {}",
172 self.region_id, self.file_id,
173 );
174 }
175
176 false
177 }
178
179 async fn do_update_flat_bloom_filter(&mut self, batch: &RecordBatch) -> bool {
181 let Some(creator) = self.bloom_filter_indexer.as_mut() else {
182 return true;
183 };
184
185 let Err(err) = creator.update_flat(batch).await else {
186 return true;
187 };
188
189 if cfg!(any(test, feature = "test")) {
190 panic!(
191 "Failed to update bloom filter with flat format, region_id: {}, file_id: {}, err: {:?}",
192 self.region_id, self.file_id, err
193 );
194 } else {
195 warn!(
196 err; "Failed to update bloom filter with flat format, region_id: {}, file_id: {}",
197 self.region_id, self.file_id,
198 );
199 }
200
201 false
202 }
203}