mito2/sst/index/inverted_index/
applier.rs1pub mod builder;
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::inverted_index::format::reader::InvertedIndexBlobReader;
23use index::inverted_index::search::index_apply::{
24 ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
25};
26use index::inverted_index::search::predicate::Predicate;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use puffin::puffin_manager::{PuffinManager, PuffinReader};
30use snafu::ResultExt;
31use store_api::storage::{ColumnId, RegionId};
32
33use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
34use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
35use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
36use crate::cache::index::result_cache::PredicateKey;
37use crate::error::{
38 ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
39};
40use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
41use crate::sst::file::FileId;
42use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
43use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
44use crate::sst::index::TYPE_INVERTED_INDEX;
45
46pub(crate) struct InvertedIndexApplier {
49 region_dir: String,
51
52 region_id: RegionId,
54
55 store: ObjectStore,
57
58 file_cache: Option<FileCacheRef>,
60
61 index_applier: Box<dyn IndexApplier>,
64
65 puffin_manager_factory: PuffinManagerFactory,
67
68 inverted_index_cache: Option<InvertedIndexCacheRef>,
70
71 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
73
74 predicate_key: PredicateKey,
76}
77
78pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
79
80impl InvertedIndexApplier {
81 pub fn new(
83 region_dir: String,
84 region_id: RegionId,
85 store: ObjectStore,
86 index_applier: Box<dyn IndexApplier>,
87 puffin_manager_factory: PuffinManagerFactory,
88 predicates: BTreeMap<ColumnId, Vec<Predicate>>,
89 ) -> Self {
90 INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
91
92 Self {
93 region_dir,
94 region_id,
95 store,
96 file_cache: None,
97 index_applier,
98 puffin_manager_factory,
99 inverted_index_cache: None,
100 puffin_metadata_cache: None,
101 predicate_key: PredicateKey::new_inverted(Arc::new(predicates)),
102 }
103 }
104
105 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
107 self.file_cache = file_cache;
108 self
109 }
110
111 pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
113 self.inverted_index_cache = index_cache;
114 self
115 }
116
117 pub fn with_puffin_metadata_cache(
119 mut self,
120 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
121 ) -> Self {
122 self.puffin_metadata_cache = puffin_metadata_cache;
123 self
124 }
125
126 pub async fn apply(&self, file_id: FileId, file_size_hint: Option<u64>) -> Result<ApplyOutput> {
128 let _timer = INDEX_APPLY_ELAPSED
129 .with_label_values(&[TYPE_INVERTED_INDEX])
130 .start_timer();
131
132 let context = SearchContext {
133 index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
135 };
136
137 let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
138 Ok(Some(puffin_reader)) => puffin_reader,
139 other => {
140 if let Err(err) = other {
141 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
142 }
143 self.remote_blob_reader(file_id, file_size_hint).await?
144 }
145 };
146
147 if let Some(index_cache) = &self.inverted_index_cache {
148 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
149 let mut index_reader = CachedInvertedIndexBlobReader::new(
150 file_id,
151 blob_size,
152 InvertedIndexBlobReader::new(blob),
153 index_cache.clone(),
154 );
155 self.index_applier
156 .apply(context, &mut index_reader)
157 .await
158 .context(ApplyInvertedIndexSnafu)
159 } else {
160 let mut index_reader = InvertedIndexBlobReader::new(blob);
161 self.index_applier
162 .apply(context, &mut index_reader)
163 .await
164 .context(ApplyInvertedIndexSnafu)
165 }
166 }
167
168 async fn cached_blob_reader(
170 &self,
171 file_id: FileId,
172 file_size_hint: Option<u64>,
173 ) -> Result<Option<BlobReader>> {
174 let Some(file_cache) = &self.file_cache else {
175 return Ok(None);
176 };
177
178 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
179 if file_cache.get(index_key).await.is_none() {
180 return Ok(None);
181 };
182
183 let puffin_manager = self.puffin_manager_factory.build(
184 file_cache.local_store(),
185 WriteCachePathProvider::new(self.region_id, file_cache.clone()),
186 );
187
188 let reader = puffin_manager
190 .reader(&file_id)
191 .await
192 .context(PuffinBuildReaderSnafu)?
193 .with_file_size_hint(file_size_hint)
194 .blob(INDEX_BLOB_TYPE)
195 .await
196 .context(PuffinReadBlobSnafu)?
197 .reader()
198 .await
199 .context(PuffinBuildReaderSnafu)?;
200 Ok(Some(reader))
201 }
202
203 async fn remote_blob_reader(
205 &self,
206 file_id: FileId,
207 file_size_hint: Option<u64>,
208 ) -> Result<BlobReader> {
209 let puffin_manager = self
210 .puffin_manager_factory
211 .build(
212 self.store.clone(),
213 RegionFilePathFactory::new(self.region_dir.clone()),
214 )
215 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
216
217 puffin_manager
218 .reader(&file_id)
219 .await
220 .context(PuffinBuildReaderSnafu)?
221 .with_file_size_hint(file_size_hint)
222 .blob(INDEX_BLOB_TYPE)
223 .await
224 .context(PuffinReadBlobSnafu)?
225 .reader()
226 .await
227 .context(PuffinBuildReaderSnafu)
228 }
229
230 pub fn predicate_key(&self) -> &PredicateKey {
232 &self.predicate_key
233 }
234}
235
236impl Drop for InvertedIndexApplier {
237 fn drop(&mut self) {
238 INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use futures::io::Cursor;
245 use index::bitmap::Bitmap;
246 use index::inverted_index::search::index_apply::MockIndexApplier;
247 use object_store::services::Memory;
248 use puffin::puffin_manager::PuffinWriter;
249
250 use super::*;
251
252 #[tokio::test]
253 async fn test_index_applier_apply_basic() {
254 let (_d, puffin_manager_factory) =
255 PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
256 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
257 let file_id = FileId::random();
258 let region_dir = "region_dir".to_string();
259
260 let puffin_manager = puffin_manager_factory.build(
261 object_store.clone(),
262 RegionFilePathFactory::new(region_dir.clone()),
263 );
264 let mut writer = puffin_manager.writer(&file_id).await.unwrap();
265 writer
266 .put_blob(
267 INDEX_BLOB_TYPE,
268 Cursor::new(vec![]),
269 Default::default(),
270 Default::default(),
271 )
272 .await
273 .unwrap();
274 writer.finish().await.unwrap();
275
276 let mut mock_index_applier = MockIndexApplier::new();
277 mock_index_applier.expect_memory_usage().returning(|| 100);
278 mock_index_applier.expect_apply().returning(|_, _| {
279 Ok(ApplyOutput {
280 matched_segment_ids: Bitmap::new_bitvec(),
281 total_row_count: 100,
282 segment_row_count: 10,
283 })
284 });
285
286 let sst_index_applier = InvertedIndexApplier::new(
287 region_dir.clone(),
288 RegionId::new(0, 0),
289 object_store,
290 Box::new(mock_index_applier),
291 puffin_manager_factory,
292 Default::default(),
293 );
294 let output = sst_index_applier.apply(file_id, None).await.unwrap();
295 assert_eq!(
296 output,
297 ApplyOutput {
298 matched_segment_ids: Bitmap::new_bitvec(),
299 total_row_count: 100,
300 segment_row_count: 10,
301 }
302 );
303 }
304
305 #[tokio::test]
306 async fn test_index_applier_apply_invalid_blob_type() {
307 let (_d, puffin_manager_factory) =
308 PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
309 .await;
310 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
311 let file_id = FileId::random();
312 let region_dir = "region_dir".to_string();
313
314 let puffin_manager = puffin_manager_factory.build(
315 object_store.clone(),
316 RegionFilePathFactory::new(region_dir.clone()),
317 );
318 let mut writer = puffin_manager.writer(&file_id).await.unwrap();
319 writer
320 .put_blob(
321 "invalid_blob_type",
322 Cursor::new(vec![]),
323 Default::default(),
324 Default::default(),
325 )
326 .await
327 .unwrap();
328 writer.finish().await.unwrap();
329
330 let mut mock_index_applier = MockIndexApplier::new();
331 mock_index_applier.expect_memory_usage().returning(|| 100);
332 mock_index_applier.expect_apply().never();
333
334 let sst_index_applier = InvertedIndexApplier::new(
335 region_dir.clone(),
336 RegionId::new(0, 0),
337 object_store,
338 Box::new(mock_index_applier),
339 puffin_manager_factory,
340 Default::default(),
341 );
342 let res = sst_index_applier.apply(file_id, None).await;
343 assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
344 }
345}