mito2/sst/index/inverted_index/
applier.rs1pub mod builder;
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
24use index::inverted_index::search::index_apply::{
25 ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
26};
27use index::inverted_index::search::predicate::Predicate;
28use object_store::ObjectStore;
29use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
30use puffin::puffin_manager::{PuffinManager, PuffinReader};
31use snafu::ResultExt;
32use store_api::region_request::PathType;
33use store_api::storage::ColumnId;
34
35use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
36use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
37use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
38use crate::cache::index::result_cache::PredicateKey;
39use crate::error::{
40 ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
41};
42use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
43use crate::sst::file::RegionFileId;
44use crate::sst::index::TYPE_INVERTED_INDEX;
45use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
46use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
47
48#[derive(Default, Clone)]
50pub struct InvertedIndexApplyMetrics {
51 pub apply_elapsed: std::time::Duration,
53 pub blob_cache_miss: usize,
55 pub blob_read_bytes: u64,
57 pub inverted_index_read_metrics: InvertedIndexReadMetrics,
59}
60
61impl std::fmt::Debug for InvertedIndexApplyMetrics {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 let Self {
64 apply_elapsed,
65 blob_cache_miss,
66 blob_read_bytes,
67 inverted_index_read_metrics,
68 } = self;
69
70 if self.is_empty() {
71 return write!(f, "{{}}");
72 }
73 write!(f, "{{")?;
74
75 write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
76
77 if *blob_cache_miss > 0 {
78 write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
79 }
80 if *blob_read_bytes > 0 {
81 write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
82 }
83 write!(
84 f,
85 ", \"inverted_index_read_metrics\":{:?}",
86 inverted_index_read_metrics
87 )?;
88
89 write!(f, "}}")
90 }
91}
92
93impl InvertedIndexApplyMetrics {
94 pub fn is_empty(&self) -> bool {
96 self.apply_elapsed.is_zero()
97 }
98
99 pub fn merge_from(&mut self, other: &Self) {
101 self.apply_elapsed += other.apply_elapsed;
102 self.blob_cache_miss += other.blob_cache_miss;
103 self.blob_read_bytes += other.blob_read_bytes;
104 self.inverted_index_read_metrics
105 .merge_from(&other.inverted_index_read_metrics);
106 }
107}
108
109pub(crate) struct InvertedIndexApplier {
112 table_dir: String,
114
115 path_type: PathType,
117
118 store: ObjectStore,
120
121 file_cache: Option<FileCacheRef>,
123
124 index_applier: Box<dyn IndexApplier>,
127
128 puffin_manager_factory: PuffinManagerFactory,
130
131 inverted_index_cache: Option<InvertedIndexCacheRef>,
133
134 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
136
137 predicate_key: PredicateKey,
139}
140
141pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
142
143impl InvertedIndexApplier {
144 pub fn new(
146 table_dir: String,
147 path_type: PathType,
148 store: ObjectStore,
149 index_applier: Box<dyn IndexApplier>,
150 puffin_manager_factory: PuffinManagerFactory,
151 predicates: BTreeMap<ColumnId, Vec<Predicate>>,
152 ) -> Self {
153 INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
154
155 Self {
156 table_dir,
157 path_type,
158 store,
159 file_cache: None,
160 index_applier,
161 puffin_manager_factory,
162 inverted_index_cache: None,
163 puffin_metadata_cache: None,
164 predicate_key: PredicateKey::new_inverted(Arc::new(predicates)),
165 }
166 }
167
168 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
170 self.file_cache = file_cache;
171 self
172 }
173
174 pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
176 self.inverted_index_cache = index_cache;
177 self
178 }
179
180 pub fn with_puffin_metadata_cache(
182 mut self,
183 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
184 ) -> Self {
185 self.puffin_metadata_cache = puffin_metadata_cache;
186 self
187 }
188
189 pub async fn apply(
196 &self,
197 file_id: RegionFileId,
198 file_size_hint: Option<u64>,
199 mut metrics: Option<&mut InvertedIndexApplyMetrics>,
200 ) -> Result<ApplyOutput> {
201 let start = Instant::now();
202
203 let context = SearchContext {
204 index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
206 };
207
208 let mut cache_miss = 0;
209 let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
210 Ok(Some(puffin_reader)) => puffin_reader,
211 other => {
212 cache_miss += 1;
213 if let Err(err) = other {
214 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
215 }
216 self.remote_blob_reader(file_id, file_size_hint).await?
217 }
218 };
219
220 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
221
222 let result = if let Some(index_cache) = &self.inverted_index_cache {
223 let mut index_reader = CachedInvertedIndexBlobReader::new(
224 file_id.file_id(),
225 blob_size,
226 InvertedIndexBlobReader::new(blob),
227 index_cache.clone(),
228 );
229 self.index_applier
230 .apply(
231 context,
232 &mut index_reader,
233 metrics
234 .as_deref_mut()
235 .map(|m| &mut m.inverted_index_read_metrics),
236 )
237 .await
238 .context(ApplyInvertedIndexSnafu)
239 } else {
240 let mut index_reader = InvertedIndexBlobReader::new(blob);
241 self.index_applier
242 .apply(
243 context,
244 &mut index_reader,
245 metrics
246 .as_deref_mut()
247 .map(|m| &mut m.inverted_index_read_metrics),
248 )
249 .await
250 .context(ApplyInvertedIndexSnafu)
251 };
252
253 let elapsed = start.elapsed();
255 INDEX_APPLY_ELAPSED
256 .with_label_values(&[TYPE_INVERTED_INDEX])
257 .observe(elapsed.as_secs_f64());
258
259 if let Some(metrics) = metrics {
260 metrics.apply_elapsed = elapsed;
261 metrics.blob_cache_miss = cache_miss;
262 metrics.blob_read_bytes = blob_size;
263 }
264
265 result
266 }
267
268 async fn cached_blob_reader(
270 &self,
271 file_id: RegionFileId,
272 file_size_hint: Option<u64>,
273 ) -> Result<Option<BlobReader>> {
274 let Some(file_cache) = &self.file_cache else {
275 return Ok(None);
276 };
277
278 let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
279 if file_cache.get(index_key).await.is_none() {
280 return Ok(None);
281 };
282
283 let puffin_manager = self.puffin_manager_factory.build(
284 file_cache.local_store(),
285 WriteCachePathProvider::new(file_cache.clone()),
286 );
287
288 let reader = puffin_manager
290 .reader(&file_id)
291 .await
292 .context(PuffinBuildReaderSnafu)?
293 .with_file_size_hint(file_size_hint)
294 .blob(INDEX_BLOB_TYPE)
295 .await
296 .context(PuffinReadBlobSnafu)?
297 .reader()
298 .await
299 .context(PuffinBuildReaderSnafu)?;
300 Ok(Some(reader))
301 }
302
303 async fn remote_blob_reader(
305 &self,
306 file_id: RegionFileId,
307 file_size_hint: Option<u64>,
308 ) -> Result<BlobReader> {
309 let puffin_manager = self
310 .puffin_manager_factory
311 .build(
312 self.store.clone(),
313 RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
314 )
315 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
316
317 puffin_manager
318 .reader(&file_id)
319 .await
320 .context(PuffinBuildReaderSnafu)?
321 .with_file_size_hint(file_size_hint)
322 .blob(INDEX_BLOB_TYPE)
323 .await
324 .context(PuffinReadBlobSnafu)?
325 .reader()
326 .await
327 .context(PuffinBuildReaderSnafu)
328 }
329
330 pub fn predicate_key(&self) -> &PredicateKey {
332 &self.predicate_key
333 }
334}
335
336impl Drop for InvertedIndexApplier {
337 fn drop(&mut self) {
338 INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use futures::io::Cursor;
345 use index::bitmap::Bitmap;
346 use index::inverted_index::search::index_apply::MockIndexApplier;
347 use object_store::services::Memory;
348 use puffin::puffin_manager::PuffinWriter;
349 use store_api::storage::FileId;
350
351 use super::*;
352
353 #[tokio::test]
354 async fn test_index_applier_apply_basic() {
355 let (_d, puffin_manager_factory) =
356 PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
357 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
358 let file_id = RegionFileId::new(0.into(), FileId::random());
359 let table_dir = "table_dir".to_string();
360
361 let puffin_manager = puffin_manager_factory.build(
362 object_store.clone(),
363 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
364 );
365 let mut writer = puffin_manager.writer(&file_id).await.unwrap();
366 writer
367 .put_blob(
368 INDEX_BLOB_TYPE,
369 Cursor::new(vec![]),
370 Default::default(),
371 Default::default(),
372 )
373 .await
374 .unwrap();
375 writer.finish().await.unwrap();
376
377 let mut mock_index_applier = MockIndexApplier::new();
378 mock_index_applier.expect_memory_usage().returning(|| 100);
379 mock_index_applier.expect_apply().returning(|_, _, _| {
380 Ok(ApplyOutput {
381 matched_segment_ids: Bitmap::new_bitvec(),
382 total_row_count: 100,
383 segment_row_count: 10,
384 })
385 });
386
387 let sst_index_applier = InvertedIndexApplier::new(
388 table_dir.clone(),
389 PathType::Bare,
390 object_store,
391 Box::new(mock_index_applier),
392 puffin_manager_factory,
393 Default::default(),
394 );
395 let output = sst_index_applier.apply(file_id, None, None).await.unwrap();
396 assert_eq!(
397 output,
398 ApplyOutput {
399 matched_segment_ids: Bitmap::new_bitvec(),
400 total_row_count: 100,
401 segment_row_count: 10,
402 }
403 );
404 }
405
406 #[tokio::test]
407 async fn test_index_applier_apply_invalid_blob_type() {
408 let (_d, puffin_manager_factory) =
409 PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
410 .await;
411 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
412 let file_id = RegionFileId::new(0.into(), FileId::random());
413 let table_dir = "table_dir".to_string();
414
415 let puffin_manager = puffin_manager_factory.build(
416 object_store.clone(),
417 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
418 );
419 let mut writer = puffin_manager.writer(&file_id).await.unwrap();
420 writer
421 .put_blob(
422 "invalid_blob_type",
423 Cursor::new(vec![]),
424 Default::default(),
425 Default::default(),
426 )
427 .await
428 .unwrap();
429 writer.finish().await.unwrap();
430
431 let mut mock_index_applier = MockIndexApplier::new();
432 mock_index_applier.expect_memory_usage().returning(|| 100);
433 mock_index_applier.expect_apply().never();
434
435 let sst_index_applier = InvertedIndexApplier::new(
436 table_dir.clone(),
437 PathType::Bare,
438 object_store,
439 Box::new(mock_index_applier),
440 puffin_manager_factory,
441 Default::default(),
442 );
443 let res = sst_index_applier.apply(file_id, None, None).await;
444 assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
445 }
446}