mito2/sst/index/indexer/
update.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::warn;
16use datatypes::arrow::record_batch::RecordBatch;
17
18use crate::read::Batch;
19use crate::sst::index::Indexer;
20
21impl Indexer {
22    pub(crate) async fn do_update(&mut self, batch: &mut Batch) {
23        if batch.is_empty() {
24            return;
25        }
26
27        if !self.do_update_inverted_index(batch).await {
28            self.do_abort().await;
29        }
30        if !self.do_update_fulltext_index(batch).await {
31            self.do_abort().await;
32        }
33        if !self.do_update_bloom_filter(batch).await {
34            self.do_abort().await;
35        }
36    }
37
38    /// Returns false if the update failed.
39    async fn do_update_inverted_index(&mut self, batch: &mut Batch) -> bool {
40        let Some(creator) = self.inverted_indexer.as_mut() else {
41            return true;
42        };
43
44        let Err(err) = creator.update(batch).await else {
45            return true;
46        };
47
48        if cfg!(any(test, feature = "test")) {
49            panic!(
50                "Failed to update inverted index, region_id: {}, file_id: {}, err: {:?}",
51                self.region_id, self.file_id, err
52            );
53        } else {
54            warn!(
55                err; "Failed to update inverted index, region_id: {}, file_id: {}",
56                self.region_id, self.file_id,
57            );
58        }
59
60        false
61    }
62
63    /// Returns false if the update failed.
64    async fn do_update_fulltext_index(&mut self, batch: &mut Batch) -> bool {
65        let Some(creator) = self.fulltext_indexer.as_mut() else {
66            return true;
67        };
68
69        let Err(err) = creator.update(batch).await else {
70            return true;
71        };
72
73        if cfg!(any(test, feature = "test")) {
74            panic!(
75                "Failed to update full-text index, region_id: {}, file_id: {}, err: {:?}",
76                self.region_id, self.file_id, err
77            );
78        } else {
79            warn!(
80                err; "Failed to update full-text index, region_id: {}, file_id: {}",
81                self.region_id, self.file_id,
82            );
83        }
84
85        false
86    }
87
88    /// Returns false if the update failed.
89    async fn do_update_bloom_filter(&mut self, batch: &mut Batch) -> bool {
90        let Some(creator) = self.bloom_filter_indexer.as_mut() else {
91            return true;
92        };
93
94        let Err(err) = creator.update(batch).await else {
95            return true;
96        };
97
98        if cfg!(any(test, feature = "test")) {
99            panic!(
100                "Failed to update bloom filter, region_id: {}, file_id: {}, err: {:?}",
101                self.region_id, self.file_id, err
102            );
103        } else {
104            warn!(
105                err; "Failed to update bloom filter, region_id: {}, file_id: {}",
106                self.region_id, self.file_id,
107            );
108        }
109
110        false
111    }
112
113    pub(crate) async fn do_update_flat(&mut self, batch: &RecordBatch) {
114        if batch.num_rows() == 0 {
115            return;
116        }
117
118        if !self.do_update_flat_inverted_index(batch).await {
119            self.do_abort().await;
120        }
121        if !self.do_update_flat_fulltext_index(batch).await {
122            self.do_abort().await;
123        }
124        if !self.do_update_flat_bloom_filter(batch).await {
125            self.do_abort().await;
126        }
127    }
128
129    /// Returns false if the update failed.
130    async fn do_update_flat_inverted_index(&mut self, batch: &RecordBatch) -> bool {
131        let Some(creator) = self.inverted_indexer.as_mut() else {
132            return true;
133        };
134
135        let Err(err) = creator.update_flat(batch).await else {
136            return true;
137        };
138
139        if cfg!(any(test, feature = "test")) {
140            panic!(
141                "Failed to update inverted index with flat format, region_id: {}, file_id: {}, err: {:?}",
142                self.region_id, self.file_id, err
143            );
144        } else {
145            warn!(
146                err; "Failed to update inverted index with flat format, region_id: {}, file_id: {}",
147                self.region_id, self.file_id,
148            );
149        }
150
151        false
152    }
153
154    /// Returns false if the update failed.
155    async fn do_update_flat_fulltext_index(&mut self, batch: &RecordBatch) -> bool {
156        let Some(creator) = self.fulltext_indexer.as_mut() else {
157            return true;
158        };
159
160        let Err(err) = creator.update_flat(batch).await else {
161            return true;
162        };
163
164        if cfg!(any(test, feature = "test")) {
165            panic!(
166                "Failed to update full-text index with flat format, region_id: {}, file_id: {}, err: {:?}",
167                self.region_id, self.file_id, err
168            );
169        } else {
170            warn!(
171                err; "Failed to update full-text index with flat format, region_id: {}, file_id: {}",
172                self.region_id, self.file_id,
173            );
174        }
175
176        false
177    }
178
179    /// Returns false if the update failed.
180    async fn do_update_flat_bloom_filter(&mut self, batch: &RecordBatch) -> bool {
181        let Some(creator) = self.bloom_filter_indexer.as_mut() else {
182            return true;
183        };
184
185        let Err(err) = creator.update_flat(batch).await else {
186            return true;
187        };
188
189        if cfg!(any(test, feature = "test")) {
190            panic!(
191                "Failed to update bloom filter with flat format, region_id: {}, file_id: {}, err: {:?}",
192                self.region_id, self.file_id, err
193            );
194        } else {
195            warn!(
196                err; "Failed to update bloom filter with flat format, region_id: {}, file_id: {}",
197                self.region_id, self.file_id,
198            );
199        }
200
201        false
202    }
203}