1use core::ops::Range;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::v1::index::InvertedIndexMetas;
20use async_trait::async_trait;
21use bytes::Bytes;
22use index::inverted_index::error::Result;
23use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
24use prost::Message;
25use store_api::storage::{FileId, IndexVersion};
26
27use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
28use crate::metrics::{CACHE_HIT, CACHE_MISS};
29
30const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
31
32pub type InvertedIndexCache = IndexCache<(FileId, IndexVersion), InvertedIndexMetas>;
34pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
35
36impl InvertedIndexCache {
37 pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
39 Self::new_with_weighter(
40 index_metadata_cap,
41 index_content_cap,
42 page_size,
43 INDEX_TYPE_INVERTED_INDEX,
44 inverted_index_metadata_weight,
45 inverted_index_content_weight,
46 )
47 }
48
49 pub fn invalidate_file(&self, file_id: FileId) {
51 self.invalidate_if(move |key| key.0 == file_id);
52 }
53}
54
55fn inverted_index_metadata_weight(k: &(FileId, IndexVersion), v: &Arc<InvertedIndexMetas>) -> u32 {
57 (k.0.as_bytes().len() + size_of::<IndexVersion>() + v.encoded_len()) as u32
58}
59
60fn inverted_index_content_weight((k, _): &((FileId, IndexVersion), PageKey), v: &Bytes) -> u32 {
62 (k.0.as_bytes().len() + size_of::<IndexVersion>() + v.len()) as u32
63}
64
65pub struct CachedInvertedIndexBlobReader<R> {
67 file_id: FileId,
68 index_version: IndexVersion,
69 blob_size: u64,
70 inner: R,
71 cache: InvertedIndexCacheRef,
72}
73
74impl<R> CachedInvertedIndexBlobReader<R> {
75 pub fn new(
77 file_id: FileId,
78 index_version: IndexVersion,
79 blob_size: u64,
80 inner: R,
81 cache: InvertedIndexCacheRef,
82 ) -> Self {
83 Self {
84 file_id,
85 index_version,
86 blob_size,
87 inner,
88 cache,
89 }
90 }
91}
92
93#[async_trait]
94impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
95 async fn range_read<'a>(
96 &self,
97 offset: u64,
98 size: u32,
99 metrics: Option<&'a mut InvertedIndexReadMetrics>,
100 ) -> Result<Vec<u8>> {
101 let start = metrics.as_ref().map(|_| Instant::now());
102
103 let inner = &self.inner;
104 let (result, cache_metrics) = self
105 .cache
106 .get_or_load(
107 (self.file_id, self.index_version),
108 self.blob_size,
109 offset,
110 size,
111 move |ranges| async move { inner.read_vec(&ranges, None).await },
112 )
113 .await?;
114
115 if let Some(m) = metrics {
116 m.total_bytes += cache_metrics.page_bytes;
117 m.total_ranges += cache_metrics.num_pages;
118 m.cache_hit += cache_metrics.cache_hit;
119 m.cache_miss += cache_metrics.cache_miss;
120 m.fetch_elapsed += start.unwrap().elapsed();
121 }
122
123 Ok(result)
124 }
125
126 async fn read_vec<'a>(
127 &self,
128 ranges: &[Range<u64>],
129 metrics: Option<&'a mut InvertedIndexReadMetrics>,
130 ) -> Result<Vec<Bytes>> {
131 let start = metrics.as_ref().map(|_| Instant::now());
132
133 let mut pages = Vec::with_capacity(ranges.len());
134 let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
135 for range in ranges {
136 let inner = &self.inner;
137 let (page, cache_metrics) = self
138 .cache
139 .get_or_load(
140 (self.file_id, self.index_version),
141 self.blob_size,
142 range.start,
143 (range.end - range.start) as u32,
144 move |ranges| async move { inner.read_vec(&ranges, None).await },
145 )
146 .await?;
147
148 total_cache_metrics.merge(&cache_metrics);
149 pages.push(Bytes::from(page));
150 }
151
152 if let Some(m) = metrics {
153 m.total_bytes += total_cache_metrics.page_bytes;
154 m.total_ranges += total_cache_metrics.num_pages;
155 m.cache_hit += total_cache_metrics.cache_hit;
156 m.cache_miss += total_cache_metrics.cache_miss;
157 m.fetch_elapsed += start.unwrap().elapsed();
158 }
159
160 Ok(pages)
161 }
162
163 async fn metadata<'a>(
164 &self,
165 metrics: Option<&'a mut InvertedIndexReadMetrics>,
166 ) -> Result<Arc<InvertedIndexMetas>> {
167 if let Some(cached) = self.cache.get_metadata((self.file_id, self.index_version)) {
168 CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
169 if let Some(m) = metrics {
170 m.cache_hit += 1;
171 }
172 Ok(cached)
173 } else {
174 let meta = self.inner.metadata(metrics).await?;
175 self.cache
176 .put_metadata((self.file_id, self.index_version), meta.clone());
177 CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
178 Ok(meta)
179 }
180 }
181}
182
183#[cfg(test)]
184mod test {
185 use std::num::NonZeroUsize;
186
187 use futures::stream;
188 use index::Bytes;
189 use index::bitmap::{Bitmap, BitmapType};
190 use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
191 use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
192 use prometheus::register_int_counter_vec;
193 use rand::{Rng, RngCore};
194
195 use super::*;
196 use crate::sst::index::store::InstrumentedStore;
197 use crate::test_util::TestEnv;
198
199 const FUZZ_REPEAT_TIMES: usize = 100;
201
202 #[test]
204 fn fuzz_index_calculation() {
205 let mut rng = rand::rng();
207 let mut data = vec![0u8; 1024 * 1024];
208 rng.fill_bytes(&mut data);
209
210 for _ in 0..FUZZ_REPEAT_TIMES {
211 let offset = rng.random_range(0..data.len() as u64);
212 let size = rng.random_range(0..data.len() as u32 - offset as u32);
213 let page_size: usize = rng.random_range(1..1024);
214
215 let indexes =
216 PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
217 let page_num = indexes.len();
218 let mut read = Vec::with_capacity(size as usize);
219 for key in indexes.into_iter() {
220 let start = key.page_id as usize * page_size;
221 let page = if start + page_size < data.len() {
222 &data[start..start + page_size]
223 } else {
224 &data[start..]
225 };
226 read.extend_from_slice(page);
227 }
228 let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
229 let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
230 if read != data.get(expected_range).unwrap() {
231 panic!(
232 "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
233 offset,
234 size,
235 page_size,
236 read.len(),
237 size as usize,
238 PageKey::calculate_range(offset, size, page_size as u64),
239 page_num
240 );
241 }
242 }
243 }
244
245 fn unpack(fst_value: u64) -> [u32; 2] {
246 bytemuck::cast::<u64, [u32; 2]>(fst_value)
247 }
248
249 async fn create_inverted_index_blob() -> Vec<u8> {
250 let mut blob = Vec::new();
251 let mut writer = InvertedIndexBlobWriter::new(&mut blob);
252 writer
253 .add_index(
254 "tag0".to_string(),
255 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
256 Box::new(stream::iter(vec![
257 Ok((
258 Bytes::from("a"),
259 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
260 )),
261 Ok((
262 Bytes::from("b"),
263 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
264 )),
265 Ok((
266 Bytes::from("c"),
267 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
268 )),
269 ])),
270 index::bitmap::BitmapType::Roaring,
271 )
272 .await
273 .unwrap();
274 writer
275 .add_index(
276 "tag1".to_string(),
277 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
278 Box::new(stream::iter(vec![
279 Ok((
280 Bytes::from("x"),
281 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
282 )),
283 Ok((
284 Bytes::from("y"),
285 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
286 )),
287 Ok((
288 Bytes::from("z"),
289 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
290 )),
291 ])),
292 index::bitmap::BitmapType::Roaring,
293 )
294 .await
295 .unwrap();
296 writer
297 .finish(8, NonZeroUsize::new(1).unwrap())
298 .await
299 .unwrap();
300
301 blob
302 }
303
304 #[tokio::test]
305 async fn test_inverted_index_cache() {
306 let blob = create_inverted_index_blob().await;
307
308 let mut env = TestEnv::new().await;
310 let file_size = blob.len() as u64;
311 let index_version = 0;
312 let store = env.init_object_store_manager();
313 let temp_path = "data";
314 store.write(temp_path, blob).await.unwrap();
315 let store = InstrumentedStore::new(store);
316 let metric =
317 register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap();
318 let counter = metric.with_label_values(&["test"]);
319 let range_reader = store
320 .range_reader("data", &counter, &counter)
321 .await
322 .unwrap();
323
324 let reader = InvertedIndexBlobReader::new(range_reader);
325 let cached_reader = CachedInvertedIndexBlobReader::new(
326 FileId::random(),
327 index_version,
328 file_size,
329 reader,
330 Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
331 );
332 let metadata = cached_reader.metadata(None).await.unwrap();
333 assert_eq!(metadata.total_row_count, 8);
334 assert_eq!(metadata.segment_row_count, 1);
335 assert_eq!(metadata.metas.len(), 2);
336 let tag0 = metadata.metas.get("tag0").unwrap();
338 let stats0 = tag0.stats.as_ref().unwrap();
339 assert_eq!(stats0.distinct_count, 3);
340 assert_eq!(stats0.null_count, 1);
341 assert_eq!(stats0.min_value, Bytes::from("a"));
342 assert_eq!(stats0.max_value, Bytes::from("c"));
343 let fst0 = cached_reader
344 .fst(
345 tag0.base_offset + tag0.relative_fst_offset as u64,
346 tag0.fst_size,
347 None,
348 )
349 .await
350 .unwrap();
351 assert_eq!(fst0.len(), 3);
352 let [offset, size] = unpack(fst0.get(b"a").unwrap());
353 let bitmap = cached_reader
354 .bitmap(
355 tag0.base_offset + offset as u64,
356 size,
357 BitmapType::Roaring,
358 None,
359 )
360 .await
361 .unwrap();
362 assert_eq!(
363 bitmap,
364 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
365 );
366 let [offset, size] = unpack(fst0.get(b"b").unwrap());
367 let bitmap = cached_reader
368 .bitmap(
369 tag0.base_offset + offset as u64,
370 size,
371 BitmapType::Roaring,
372 None,
373 )
374 .await
375 .unwrap();
376 assert_eq!(
377 bitmap,
378 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
379 );
380 let [offset, size] = unpack(fst0.get(b"c").unwrap());
381 let bitmap = cached_reader
382 .bitmap(
383 tag0.base_offset + offset as u64,
384 size,
385 BitmapType::Roaring,
386 None,
387 )
388 .await
389 .unwrap();
390 assert_eq!(
391 bitmap,
392 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
393 );
394
395 let tag1 = metadata.metas.get("tag1").unwrap();
397 let stats1 = tag1.stats.as_ref().unwrap();
398 assert_eq!(stats1.distinct_count, 3);
399 assert_eq!(stats1.null_count, 1);
400 assert_eq!(stats1.min_value, Bytes::from("x"));
401 assert_eq!(stats1.max_value, Bytes::from("z"));
402 let fst1 = cached_reader
403 .fst(
404 tag1.base_offset + tag1.relative_fst_offset as u64,
405 tag1.fst_size,
406 None,
407 )
408 .await
409 .unwrap();
410 assert_eq!(fst1.len(), 3);
411 let [offset, size] = unpack(fst1.get(b"x").unwrap());
412 let bitmap = cached_reader
413 .bitmap(
414 tag1.base_offset + offset as u64,
415 size,
416 BitmapType::Roaring,
417 None,
418 )
419 .await
420 .unwrap();
421 assert_eq!(
422 bitmap,
423 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
424 );
425 let [offset, size] = unpack(fst1.get(b"y").unwrap());
426 let bitmap = cached_reader
427 .bitmap(
428 tag1.base_offset + offset as u64,
429 size,
430 BitmapType::Roaring,
431 None,
432 )
433 .await
434 .unwrap();
435 assert_eq!(
436 bitmap,
437 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
438 );
439 let [offset, size] = unpack(fst1.get(b"z").unwrap());
440 let bitmap = cached_reader
441 .bitmap(
442 tag1.base_offset + offset as u64,
443 size,
444 BitmapType::Roaring,
445 None,
446 )
447 .await
448 .unwrap();
449 assert_eq!(
450 bitmap,
451 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
452 );
453
454 let mut rng = rand::rng();
456 for _ in 0..FUZZ_REPEAT_TIMES {
457 let offset = rng.random_range(0..file_size);
458 let size = rng.random_range(0..file_size as u32 - offset as u32);
459 let expected = cached_reader.range_read(offset, size, None).await.unwrap();
460 let inner = &cached_reader.inner;
461 let (read, _cache_metrics) = cached_reader
462 .cache
463 .get_or_load(
464 (cached_reader.file_id, cached_reader.index_version),
465 file_size,
466 offset,
467 size,
468 |ranges| async move { inner.read_vec(&ranges, None).await },
469 )
470 .await
471 .unwrap();
472 assert_eq!(read, expected);
473 }
474 }
475}