mito2/sst/index/inverted_index/
applier.rs1pub mod builder;
16
17use std::sync::Arc;
18
19use common_base::range_read::RangeReader;
20use common_telemetry::warn;
21use index::inverted_index::format::reader::InvertedIndexBlobReader;
22use index::inverted_index::search::index_apply::{
23 ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
24};
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use puffin::puffin_manager::{PuffinManager, PuffinReader};
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30
31use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
32use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
33use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
34use crate::error::{
35 ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
36};
37use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
38use crate::sst::file::FileId;
39use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
40use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
41use crate::sst::index::TYPE_INVERTED_INDEX;
42
43pub(crate) struct InvertedIndexApplier {
46 region_dir: String,
48
49 region_id: RegionId,
51
52 store: ObjectStore,
54
55 file_cache: Option<FileCacheRef>,
57
58 index_applier: Box<dyn IndexApplier>,
61
62 puffin_manager_factory: PuffinManagerFactory,
64
65 inverted_index_cache: Option<InvertedIndexCacheRef>,
67
68 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
70}
71
72pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
73
74impl InvertedIndexApplier {
75 pub fn new(
77 region_dir: String,
78 region_id: RegionId,
79 store: ObjectStore,
80 index_applier: Box<dyn IndexApplier>,
81 puffin_manager_factory: PuffinManagerFactory,
82 ) -> Self {
83 INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
84
85 Self {
86 region_dir,
87 region_id,
88 store,
89 file_cache: None,
90 index_applier,
91 puffin_manager_factory,
92 inverted_index_cache: None,
93 puffin_metadata_cache: None,
94 }
95 }
96
97 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
99 self.file_cache = file_cache;
100 self
101 }
102
103 pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
105 self.inverted_index_cache = index_cache;
106 self
107 }
108
109 pub fn with_puffin_metadata_cache(
111 mut self,
112 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
113 ) -> Self {
114 self.puffin_metadata_cache = puffin_metadata_cache;
115 self
116 }
117
118 pub async fn apply(&self, file_id: FileId, file_size_hint: Option<u64>) -> Result<ApplyOutput> {
120 let _timer = INDEX_APPLY_ELAPSED
121 .with_label_values(&[TYPE_INVERTED_INDEX])
122 .start_timer();
123
124 let context = SearchContext {
125 index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
127 };
128
129 let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
130 Ok(Some(puffin_reader)) => puffin_reader,
131 other => {
132 if let Err(err) = other {
133 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
134 }
135 self.remote_blob_reader(file_id, file_size_hint).await?
136 }
137 };
138
139 if let Some(index_cache) = &self.inverted_index_cache {
140 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
141 let mut index_reader = CachedInvertedIndexBlobReader::new(
142 file_id,
143 blob_size,
144 InvertedIndexBlobReader::new(blob),
145 index_cache.clone(),
146 );
147 self.index_applier
148 .apply(context, &mut index_reader)
149 .await
150 .context(ApplyInvertedIndexSnafu)
151 } else {
152 let mut index_reader = InvertedIndexBlobReader::new(blob);
153 self.index_applier
154 .apply(context, &mut index_reader)
155 .await
156 .context(ApplyInvertedIndexSnafu)
157 }
158 }
159
160 async fn cached_blob_reader(
162 &self,
163 file_id: FileId,
164 file_size_hint: Option<u64>,
165 ) -> Result<Option<BlobReader>> {
166 let Some(file_cache) = &self.file_cache else {
167 return Ok(None);
168 };
169
170 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
171 if file_cache.get(index_key).await.is_none() {
172 return Ok(None);
173 };
174
175 let puffin_manager = self.puffin_manager_factory.build(
176 file_cache.local_store(),
177 WriteCachePathProvider::new(self.region_id, file_cache.clone()),
178 );
179
180 let reader = puffin_manager
182 .reader(&file_id)
183 .await
184 .context(PuffinBuildReaderSnafu)?
185 .with_file_size_hint(file_size_hint)
186 .blob(INDEX_BLOB_TYPE)
187 .await
188 .context(PuffinReadBlobSnafu)?
189 .reader()
190 .await
191 .context(PuffinBuildReaderSnafu)?;
192 Ok(Some(reader))
193 }
194
195 async fn remote_blob_reader(
197 &self,
198 file_id: FileId,
199 file_size_hint: Option<u64>,
200 ) -> Result<BlobReader> {
201 let puffin_manager = self
202 .puffin_manager_factory
203 .build(
204 self.store.clone(),
205 RegionFilePathFactory::new(self.region_dir.clone()),
206 )
207 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
208
209 puffin_manager
210 .reader(&file_id)
211 .await
212 .context(PuffinBuildReaderSnafu)?
213 .with_file_size_hint(file_size_hint)
214 .blob(INDEX_BLOB_TYPE)
215 .await
216 .context(PuffinReadBlobSnafu)?
217 .reader()
218 .await
219 .context(PuffinBuildReaderSnafu)
220 }
221}
222
223impl Drop for InvertedIndexApplier {
224 fn drop(&mut self) {
225 INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use futures::io::Cursor;
232 use index::bitmap::Bitmap;
233 use index::inverted_index::search::index_apply::MockIndexApplier;
234 use object_store::services::Memory;
235 use puffin::puffin_manager::PuffinWriter;
236
237 use super::*;
238
239 #[tokio::test]
240 async fn test_index_applier_apply_basic() {
241 let (_d, puffin_manager_factory) =
242 PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
243 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
244 let file_id = FileId::random();
245 let region_dir = "region_dir".to_string();
246
247 let puffin_manager = puffin_manager_factory.build(
248 object_store.clone(),
249 RegionFilePathFactory::new(region_dir.clone()),
250 );
251 let mut writer = puffin_manager.writer(&file_id).await.unwrap();
252 writer
253 .put_blob(
254 INDEX_BLOB_TYPE,
255 Cursor::new(vec![]),
256 Default::default(),
257 Default::default(),
258 )
259 .await
260 .unwrap();
261 writer.finish().await.unwrap();
262
263 let mut mock_index_applier = MockIndexApplier::new();
264 mock_index_applier.expect_memory_usage().returning(|| 100);
265 mock_index_applier.expect_apply().returning(|_, _| {
266 Ok(ApplyOutput {
267 matched_segment_ids: Bitmap::new_bitvec(),
268 total_row_count: 100,
269 segment_row_count: 10,
270 })
271 });
272
273 let sst_index_applier = InvertedIndexApplier::new(
274 region_dir.clone(),
275 RegionId::new(0, 0),
276 object_store,
277 Box::new(mock_index_applier),
278 puffin_manager_factory,
279 );
280 let output = sst_index_applier.apply(file_id, None).await.unwrap();
281 assert_eq!(
282 output,
283 ApplyOutput {
284 matched_segment_ids: Bitmap::new_bitvec(),
285 total_row_count: 100,
286 segment_row_count: 10,
287 }
288 );
289 }
290
291 #[tokio::test]
292 async fn test_index_applier_apply_invalid_blob_type() {
293 let (_d, puffin_manager_factory) =
294 PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
295 .await;
296 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
297 let file_id = FileId::random();
298 let region_dir = "region_dir".to_string();
299
300 let puffin_manager = puffin_manager_factory.build(
301 object_store.clone(),
302 RegionFilePathFactory::new(region_dir.clone()),
303 );
304 let mut writer = puffin_manager.writer(&file_id).await.unwrap();
305 writer
306 .put_blob(
307 "invalid_blob_type",
308 Cursor::new(vec![]),
309 Default::default(),
310 Default::default(),
311 )
312 .await
313 .unwrap();
314 writer.finish().await.unwrap();
315
316 let mut mock_index_applier = MockIndexApplier::new();
317 mock_index_applier.expect_memory_usage().returning(|| 100);
318 mock_index_applier.expect_apply().never();
319
320 let sst_index_applier = InvertedIndexApplier::new(
321 region_dir.clone(),
322 RegionId::new(0, 0),
323 object_store,
324 Box::new(mock_index_applier),
325 puffin_manager_factory,
326 );
327 let res = sst_index_applier.apply(file_id, None).await;
328 assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
329 }
330}