mito2/sst/index/inverted_index/
applier.rs1pub mod builder;
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::inverted_index::format::reader::InvertedIndexBlobReader;
23use index::inverted_index::search::index_apply::{
24 ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
25};
26use index::inverted_index::search::predicate::Predicate;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use puffin::puffin_manager::{PuffinManager, PuffinReader};
30use snafu::ResultExt;
31use store_api::region_request::PathType;
32use store_api::storage::ColumnId;
33
34use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
35use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
36use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
37use crate::cache::index::result_cache::PredicateKey;
38use crate::error::{
39 ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
40};
41use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
42use crate::sst::file::RegionFileId;
43use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
44use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
45use crate::sst::index::TYPE_INVERTED_INDEX;
46
47pub(crate) struct InvertedIndexApplier {
50 table_dir: String,
52
53 path_type: PathType,
55
56 store: ObjectStore,
58
59 file_cache: Option<FileCacheRef>,
61
62 index_applier: Box<dyn IndexApplier>,
65
66 puffin_manager_factory: PuffinManagerFactory,
68
69 inverted_index_cache: Option<InvertedIndexCacheRef>,
71
72 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
74
75 predicate_key: PredicateKey,
77}
78
79pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
80
81impl InvertedIndexApplier {
82 pub fn new(
84 table_dir: String,
85 path_type: PathType,
86 store: ObjectStore,
87 index_applier: Box<dyn IndexApplier>,
88 puffin_manager_factory: PuffinManagerFactory,
89 predicates: BTreeMap<ColumnId, Vec<Predicate>>,
90 ) -> Self {
91 INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
92
93 Self {
94 table_dir,
95 path_type,
96 store,
97 file_cache: None,
98 index_applier,
99 puffin_manager_factory,
100 inverted_index_cache: None,
101 puffin_metadata_cache: None,
102 predicate_key: PredicateKey::new_inverted(Arc::new(predicates)),
103 }
104 }
105
106 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
108 self.file_cache = file_cache;
109 self
110 }
111
112 pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
114 self.inverted_index_cache = index_cache;
115 self
116 }
117
118 pub fn with_puffin_metadata_cache(
120 mut self,
121 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
122 ) -> Self {
123 self.puffin_metadata_cache = puffin_metadata_cache;
124 self
125 }
126
127 pub async fn apply(
129 &self,
130 file_id: RegionFileId,
131 file_size_hint: Option<u64>,
132 ) -> Result<ApplyOutput> {
133 let _timer = INDEX_APPLY_ELAPSED
134 .with_label_values(&[TYPE_INVERTED_INDEX])
135 .start_timer();
136
137 let context = SearchContext {
138 index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
140 };
141
142 let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
143 Ok(Some(puffin_reader)) => puffin_reader,
144 other => {
145 if let Err(err) = other {
146 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
147 }
148 self.remote_blob_reader(file_id, file_size_hint).await?
149 }
150 };
151
152 if let Some(index_cache) = &self.inverted_index_cache {
153 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
154 let mut index_reader = CachedInvertedIndexBlobReader::new(
155 file_id.file_id(),
156 blob_size,
157 InvertedIndexBlobReader::new(blob),
158 index_cache.clone(),
159 );
160 self.index_applier
161 .apply(context, &mut index_reader)
162 .await
163 .context(ApplyInvertedIndexSnafu)
164 } else {
165 let mut index_reader = InvertedIndexBlobReader::new(blob);
166 self.index_applier
167 .apply(context, &mut index_reader)
168 .await
169 .context(ApplyInvertedIndexSnafu)
170 }
171 }
172
173 async fn cached_blob_reader(
175 &self,
176 file_id: RegionFileId,
177 file_size_hint: Option<u64>,
178 ) -> Result<Option<BlobReader>> {
179 let Some(file_cache) = &self.file_cache else {
180 return Ok(None);
181 };
182
183 let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
184 if file_cache.get(index_key).await.is_none() {
185 return Ok(None);
186 };
187
188 let puffin_manager = self.puffin_manager_factory.build(
189 file_cache.local_store(),
190 WriteCachePathProvider::new(file_cache.clone()),
191 );
192
193 let reader = puffin_manager
195 .reader(&file_id)
196 .await
197 .context(PuffinBuildReaderSnafu)?
198 .with_file_size_hint(file_size_hint)
199 .blob(INDEX_BLOB_TYPE)
200 .await
201 .context(PuffinReadBlobSnafu)?
202 .reader()
203 .await
204 .context(PuffinBuildReaderSnafu)?;
205 Ok(Some(reader))
206 }
207
208 async fn remote_blob_reader(
210 &self,
211 file_id: RegionFileId,
212 file_size_hint: Option<u64>,
213 ) -> Result<BlobReader> {
214 let puffin_manager = self
215 .puffin_manager_factory
216 .build(
217 self.store.clone(),
218 RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
219 )
220 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
221
222 puffin_manager
223 .reader(&file_id)
224 .await
225 .context(PuffinBuildReaderSnafu)?
226 .with_file_size_hint(file_size_hint)
227 .blob(INDEX_BLOB_TYPE)
228 .await
229 .context(PuffinReadBlobSnafu)?
230 .reader()
231 .await
232 .context(PuffinBuildReaderSnafu)
233 }
234
235 pub fn predicate_key(&self) -> &PredicateKey {
237 &self.predicate_key
238 }
239}
240
241impl Drop for InvertedIndexApplier {
242 fn drop(&mut self) {
243 INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use futures::io::Cursor;
250 use index::bitmap::Bitmap;
251 use index::inverted_index::search::index_apply::MockIndexApplier;
252 use object_store::services::Memory;
253 use puffin::puffin_manager::PuffinWriter;
254
255 use super::*;
256 use crate::sst::file::FileId;
257
258 #[tokio::test]
259 async fn test_index_applier_apply_basic() {
260 let (_d, puffin_manager_factory) =
261 PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
262 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
263 let file_id = RegionFileId::new(0.into(), FileId::random());
264 let table_dir = "table_dir".to_string();
265
266 let puffin_manager = puffin_manager_factory.build(
267 object_store.clone(),
268 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
269 );
270 let mut writer = puffin_manager.writer(&file_id).await.unwrap();
271 writer
272 .put_blob(
273 INDEX_BLOB_TYPE,
274 Cursor::new(vec![]),
275 Default::default(),
276 Default::default(),
277 )
278 .await
279 .unwrap();
280 writer.finish().await.unwrap();
281
282 let mut mock_index_applier = MockIndexApplier::new();
283 mock_index_applier.expect_memory_usage().returning(|| 100);
284 mock_index_applier.expect_apply().returning(|_, _| {
285 Ok(ApplyOutput {
286 matched_segment_ids: Bitmap::new_bitvec(),
287 total_row_count: 100,
288 segment_row_count: 10,
289 })
290 });
291
292 let sst_index_applier = InvertedIndexApplier::new(
293 table_dir.clone(),
294 PathType::Bare,
295 object_store,
296 Box::new(mock_index_applier),
297 puffin_manager_factory,
298 Default::default(),
299 );
300 let output = sst_index_applier.apply(file_id, None).await.unwrap();
301 assert_eq!(
302 output,
303 ApplyOutput {
304 matched_segment_ids: Bitmap::new_bitvec(),
305 total_row_count: 100,
306 segment_row_count: 10,
307 }
308 );
309 }
310
311 #[tokio::test]
312 async fn test_index_applier_apply_invalid_blob_type() {
313 let (_d, puffin_manager_factory) =
314 PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
315 .await;
316 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
317 let file_id = RegionFileId::new(0.into(), FileId::random());
318 let table_dir = "table_dir".to_string();
319
320 let puffin_manager = puffin_manager_factory.build(
321 object_store.clone(),
322 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
323 );
324 let mut writer = puffin_manager.writer(&file_id).await.unwrap();
325 writer
326 .put_blob(
327 "invalid_blob_type",
328 Cursor::new(vec![]),
329 Default::default(),
330 Default::default(),
331 )
332 .await
333 .unwrap();
334 writer.finish().await.unwrap();
335
336 let mut mock_index_applier = MockIndexApplier::new();
337 mock_index_applier.expect_memory_usage().returning(|| 100);
338 mock_index_applier.expect_apply().never();
339
340 let sst_index_applier = InvertedIndexApplier::new(
341 table_dir.clone(),
342 PathType::Bare,
343 object_store,
344 Box::new(mock_index_applier),
345 puffin_manager_factory,
346 Default::default(),
347 );
348 let res = sst_index_applier.apply(file_id, None).await;
349 assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
350 }
351}