mito2/sst/index/inverted_index/
applier.rs1pub mod builder;
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
24use index::inverted_index::search::index_apply::{
25 ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
26};
27use index::inverted_index::search::predicate::Predicate;
28use object_store::ObjectStore;
29use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
30use puffin::puffin_manager::{PuffinManager, PuffinReader};
31use snafu::ResultExt;
32use store_api::region_request::PathType;
33use store_api::storage::ColumnId;
34
35use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
36use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
37use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
38use crate::cache::index::result_cache::PredicateKey;
39use crate::error::{
40 ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
41};
42use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
43use crate::sst::file::RegionIndexId;
44use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
45use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
46use crate::sst::index::{TYPE_INVERTED_INDEX, trigger_index_background_download};
47
48#[derive(Default, Clone)]
50pub struct InvertedIndexApplyMetrics {
51 pub apply_elapsed: std::time::Duration,
53 pub blob_cache_miss: usize,
55 pub blob_read_bytes: u64,
57 pub inverted_index_read_metrics: InvertedIndexReadMetrics,
59}
60
61impl std::fmt::Debug for InvertedIndexApplyMetrics {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 let Self {
64 apply_elapsed,
65 blob_cache_miss,
66 blob_read_bytes,
67 inverted_index_read_metrics,
68 } = self;
69
70 if self.is_empty() {
71 return write!(f, "{{}}");
72 }
73 write!(f, "{{")?;
74
75 write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
76
77 if *blob_cache_miss > 0 {
78 write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
79 }
80 if *blob_read_bytes > 0 {
81 write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
82 }
83 write!(
84 f,
85 ", \"inverted_index_read_metrics\":{:?}",
86 inverted_index_read_metrics
87 )?;
88
89 write!(f, "}}")
90 }
91}
92
93impl InvertedIndexApplyMetrics {
94 pub fn is_empty(&self) -> bool {
96 self.apply_elapsed.is_zero()
97 }
98
99 pub fn merge_from(&mut self, other: &Self) {
101 self.apply_elapsed += other.apply_elapsed;
102 self.blob_cache_miss += other.blob_cache_miss;
103 self.blob_read_bytes += other.blob_read_bytes;
104 self.inverted_index_read_metrics
105 .merge_from(&other.inverted_index_read_metrics);
106 }
107}
108
109pub(crate) struct InvertedIndexApplier {
112 table_dir: String,
114
115 path_type: PathType,
117
118 store: ObjectStore,
120
121 file_cache: Option<FileCacheRef>,
123
124 index_applier: Box<dyn IndexApplier>,
127
128 puffin_manager_factory: PuffinManagerFactory,
130
131 inverted_index_cache: Option<InvertedIndexCacheRef>,
133
134 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
136
137 predicate_key: PredicateKey,
139}
140
141pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
142
143impl InvertedIndexApplier {
144 pub fn new(
146 table_dir: String,
147 path_type: PathType,
148 store: ObjectStore,
149 index_applier: Box<dyn IndexApplier>,
150 puffin_manager_factory: PuffinManagerFactory,
151 predicates: BTreeMap<ColumnId, Vec<Predicate>>,
152 ) -> Self {
153 INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
154
155 Self {
156 table_dir,
157 path_type,
158 store,
159 file_cache: None,
160 index_applier,
161 puffin_manager_factory,
162 inverted_index_cache: None,
163 puffin_metadata_cache: None,
164 predicate_key: PredicateKey::new_inverted(Arc::new(predicates)),
165 }
166 }
167
168 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
170 self.file_cache = file_cache;
171 self
172 }
173
174 pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
176 self.inverted_index_cache = index_cache;
177 self
178 }
179
180 pub fn with_puffin_metadata_cache(
182 mut self,
183 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
184 ) -> Self {
185 self.puffin_metadata_cache = puffin_metadata_cache;
186 self
187 }
188
189 #[tracing::instrument(
196 skip_all,
197 fields(file_id = %file_id)
198 )]
199 pub async fn apply(
200 &self,
201 file_id: RegionIndexId,
202 file_size_hint: Option<u64>,
203 mut metrics: Option<&mut InvertedIndexApplyMetrics>,
204 ) -> Result<ApplyOutput> {
205 let start = Instant::now();
206
207 let context = SearchContext {
208 index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
210 };
211
212 let mut cache_miss = 0;
213 let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
214 Ok(Some(puffin_reader)) => puffin_reader,
215 other => {
216 cache_miss += 1;
217 if let Err(err) = other {
218 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
219 }
220 self.remote_blob_reader(file_id, file_size_hint).await?
221 }
222 };
223
224 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
225
226 let result = if let Some(index_cache) = &self.inverted_index_cache {
227 let mut index_reader = CachedInvertedIndexBlobReader::new(
228 file_id.file_id(),
229 file_id.version,
230 blob_size,
231 InvertedIndexBlobReader::new(blob),
232 index_cache.clone(),
233 );
234 self.index_applier
235 .apply(
236 context,
237 &mut index_reader,
238 metrics
239 .as_deref_mut()
240 .map(|m| &mut m.inverted_index_read_metrics),
241 )
242 .await
243 .context(ApplyInvertedIndexSnafu)
244 } else {
245 let mut index_reader = InvertedIndexBlobReader::new(blob);
246 self.index_applier
247 .apply(
248 context,
249 &mut index_reader,
250 metrics
251 .as_deref_mut()
252 .map(|m| &mut m.inverted_index_read_metrics),
253 )
254 .await
255 .context(ApplyInvertedIndexSnafu)
256 };
257
258 let elapsed = start.elapsed();
260 INDEX_APPLY_ELAPSED
261 .with_label_values(&[TYPE_INVERTED_INDEX])
262 .observe(elapsed.as_secs_f64());
263
264 if let Some(metrics) = metrics {
265 metrics.apply_elapsed = elapsed;
266 metrics.blob_cache_miss = cache_miss;
267 metrics.blob_read_bytes = blob_size;
268 }
269
270 result
271 }
272
273 async fn cached_blob_reader(
275 &self,
276 file_id: RegionIndexId,
277 file_size_hint: Option<u64>,
278 ) -> Result<Option<BlobReader>> {
279 let Some(file_cache) = &self.file_cache else {
280 return Ok(None);
281 };
282
283 let index_key = IndexKey::new(
284 file_id.region_id(),
285 file_id.file_id(),
286 FileType::Puffin(file_id.version),
287 );
288 if file_cache.get(index_key).await.is_none() {
289 return Ok(None);
290 };
291
292 let puffin_manager = self.puffin_manager_factory.build(
293 file_cache.local_store(),
294 WriteCachePathProvider::new(file_cache.clone()),
295 );
296
297 let reader = puffin_manager
299 .reader(&file_id)
300 .await
301 .context(PuffinBuildReaderSnafu)?
302 .with_file_size_hint(file_size_hint)
303 .blob(INDEX_BLOB_TYPE)
304 .await
305 .context(PuffinReadBlobSnafu)?
306 .reader()
307 .await
308 .context(PuffinBuildReaderSnafu)?;
309 Ok(Some(reader))
310 }
311
312 async fn remote_blob_reader(
314 &self,
315 file_id: RegionIndexId,
316 file_size_hint: Option<u64>,
317 ) -> Result<BlobReader> {
318 let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
319
320 trigger_index_background_download(
322 self.file_cache.as_ref(),
323 &file_id,
324 file_size_hint,
325 &path_factory,
326 &self.store,
327 );
328
329 let puffin_manager = self
330 .puffin_manager_factory
331 .build(self.store.clone(), path_factory)
332 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
333
334 puffin_manager
335 .reader(&file_id)
336 .await
337 .context(PuffinBuildReaderSnafu)?
338 .with_file_size_hint(file_size_hint)
339 .blob(INDEX_BLOB_TYPE)
340 .await
341 .context(PuffinReadBlobSnafu)?
342 .reader()
343 .await
344 .context(PuffinBuildReaderSnafu)
345 }
346
347 pub fn predicate_key(&self) -> &PredicateKey {
349 &self.predicate_key
350 }
351}
352
353impl Drop for InvertedIndexApplier {
354 fn drop(&mut self) {
355 INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use futures::io::Cursor;
362 use index::bitmap::Bitmap;
363 use index::inverted_index::search::index_apply::MockIndexApplier;
364 use object_store::services::Memory;
365 use puffin::puffin_manager::PuffinWriter;
366 use store_api::storage::FileId;
367
368 use super::*;
369 use crate::sst::index::RegionFileId;
370
371 #[tokio::test]
372 async fn test_index_applier_apply_basic() {
373 let (_d, puffin_manager_factory) =
374 PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
375 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
376 let file_id = RegionFileId::new(0.into(), FileId::random());
377 let index_id = RegionIndexId::new(file_id, 0);
378 let table_dir = "table_dir".to_string();
379
380 let puffin_manager = puffin_manager_factory.build(
381 object_store.clone(),
382 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
383 );
384 let mut writer = puffin_manager.writer(&index_id).await.unwrap();
385 writer
386 .put_blob(
387 INDEX_BLOB_TYPE,
388 Cursor::new(vec![]),
389 Default::default(),
390 Default::default(),
391 )
392 .await
393 .unwrap();
394 writer.finish().await.unwrap();
395
396 let mut mock_index_applier = MockIndexApplier::new();
397 mock_index_applier.expect_memory_usage().returning(|| 100);
398 mock_index_applier.expect_apply().returning(|_, _, _| {
399 Ok(ApplyOutput {
400 matched_segment_ids: Bitmap::new_bitvec(),
401 total_row_count: 100,
402 segment_row_count: 10,
403 })
404 });
405
406 let sst_index_applier = InvertedIndexApplier::new(
407 table_dir.clone(),
408 PathType::Bare,
409 object_store,
410 Box::new(mock_index_applier),
411 puffin_manager_factory,
412 Default::default(),
413 );
414 let output = sst_index_applier.apply(index_id, None, None).await.unwrap();
415 assert_eq!(
416 output,
417 ApplyOutput {
418 matched_segment_ids: Bitmap::new_bitvec(),
419 total_row_count: 100,
420 segment_row_count: 10,
421 }
422 );
423 }
424
425 #[tokio::test]
426 async fn test_index_applier_apply_invalid_blob_type() {
427 let (_d, puffin_manager_factory) =
428 PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
429 .await;
430 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
431 let file_id = RegionFileId::new(0.into(), FileId::random());
432 let index_id = RegionIndexId::new(file_id, 0);
433 let table_dir = "table_dir".to_string();
434
435 let puffin_manager = puffin_manager_factory.build(
436 object_store.clone(),
437 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
438 );
439 let mut writer = puffin_manager.writer(&index_id).await.unwrap();
440 writer
441 .put_blob(
442 "invalid_blob_type",
443 Cursor::new(vec![]),
444 Default::default(),
445 Default::default(),
446 )
447 .await
448 .unwrap();
449 writer.finish().await.unwrap();
450
451 let mut mock_index_applier = MockIndexApplier::new();
452 mock_index_applier.expect_memory_usage().returning(|| 100);
453 mock_index_applier.expect_apply().never();
454
455 let sst_index_applier = InvertedIndexApplier::new(
456 table_dir.clone(),
457 PathType::Bare,
458 object_store,
459 Box::new(mock_index_applier),
460 puffin_manager_factory,
461 Default::default(),
462 );
463 let res = sst_index_applier.apply(index_id, None, None).await;
464 assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
465 }
466}