1use core::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::InvertedIndexMetas;
19use async_trait::async_trait;
20use bytes::Bytes;
21use index::inverted_index::error::Result;
22use index::inverted_index::format::reader::InvertedIndexReader;
23use prost::Message;
24use store_api::storage::FileId;
25
26use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
27use crate::metrics::{CACHE_HIT, CACHE_MISS};
28
29const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
30
31pub type InvertedIndexCache = IndexCache<FileId, InvertedIndexMetas>;
33pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
34
35impl InvertedIndexCache {
36 pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
38 Self::new_with_weighter(
39 index_metadata_cap,
40 index_content_cap,
41 page_size,
42 INDEX_TYPE_INVERTED_INDEX,
43 inverted_index_metadata_weight,
44 inverted_index_content_weight,
45 )
46 }
47
48 pub fn invalidate_file(&self, file_id: FileId) {
50 self.invalidate_if(move |key| *key == file_id);
51 }
52}
53
54fn inverted_index_metadata_weight(k: &FileId, v: &Arc<InvertedIndexMetas>) -> u32 {
56 (k.as_bytes().len() + v.encoded_len()) as u32
57}
58
59fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
61 (k.as_bytes().len() + v.len()) as u32
62}
63
64pub struct CachedInvertedIndexBlobReader<R> {
66 file_id: FileId,
67 blob_size: u64,
68 inner: R,
69 cache: InvertedIndexCacheRef,
70}
71
72impl<R> CachedInvertedIndexBlobReader<R> {
73 pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
75 Self {
76 file_id,
77 blob_size,
78 inner,
79 cache,
80 }
81 }
82}
83
84#[async_trait]
85impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
86 async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
87 let inner = &self.inner;
88 self.cache
89 .get_or_load(
90 self.file_id,
91 self.blob_size,
92 offset,
93 size,
94 move |ranges| async move { inner.read_vec(&ranges).await },
95 )
96 .await
97 }
98
99 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
100 let mut pages = Vec::with_capacity(ranges.len());
101 for range in ranges {
102 let inner = &self.inner;
103 let page = self
104 .cache
105 .get_or_load(
106 self.file_id,
107 self.blob_size,
108 range.start,
109 (range.end - range.start) as u32,
110 move |ranges| async move { inner.read_vec(&ranges).await },
111 )
112 .await?;
113
114 pages.push(Bytes::from(page));
115 }
116
117 Ok(pages)
118 }
119
120 async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
121 if let Some(cached) = self.cache.get_metadata(self.file_id) {
122 CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
123 Ok(cached)
124 } else {
125 let meta = self.inner.metadata().await?;
126 self.cache.put_metadata(self.file_id, meta.clone());
127 CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
128 Ok(meta)
129 }
130 }
131}
132
133#[cfg(test)]
134mod test {
135 use std::num::NonZeroUsize;
136
137 use futures::stream;
138 use index::Bytes;
139 use index::bitmap::{Bitmap, BitmapType};
140 use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
141 use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
142 use prometheus::register_int_counter_vec;
143 use rand::{Rng, RngCore};
144
145 use super::*;
146 use crate::sst::index::store::InstrumentedStore;
147 use crate::test_util::TestEnv;
148
149 const FUZZ_REPEAT_TIMES: usize = 100;
151
152 #[test]
154 fn fuzz_index_calculation() {
155 let mut rng = rand::rng();
157 let mut data = vec![0u8; 1024 * 1024];
158 rng.fill_bytes(&mut data);
159
160 for _ in 0..FUZZ_REPEAT_TIMES {
161 let offset = rng.random_range(0..data.len() as u64);
162 let size = rng.random_range(0..data.len() as u32 - offset as u32);
163 let page_size: usize = rng.random_range(1..1024);
164
165 let indexes =
166 PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
167 let page_num = indexes.len();
168 let mut read = Vec::with_capacity(size as usize);
169 for key in indexes.into_iter() {
170 let start = key.page_id as usize * page_size;
171 let page = if start + page_size < data.len() {
172 &data[start..start + page_size]
173 } else {
174 &data[start..]
175 };
176 read.extend_from_slice(page);
177 }
178 let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
179 let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
180 if read != data.get(expected_range).unwrap() {
181 panic!(
182 "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
183 offset,
184 size,
185 page_size,
186 read.len(),
187 size as usize,
188 PageKey::calculate_range(offset, size, page_size as u64),
189 page_num
190 );
191 }
192 }
193 }
194
195 fn unpack(fst_value: u64) -> [u32; 2] {
196 bytemuck::cast::<u64, [u32; 2]>(fst_value)
197 }
198
199 async fn create_inverted_index_blob() -> Vec<u8> {
200 let mut blob = Vec::new();
201 let mut writer = InvertedIndexBlobWriter::new(&mut blob);
202 writer
203 .add_index(
204 "tag0".to_string(),
205 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
206 Box::new(stream::iter(vec![
207 Ok((
208 Bytes::from("a"),
209 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
210 )),
211 Ok((
212 Bytes::from("b"),
213 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
214 )),
215 Ok((
216 Bytes::from("c"),
217 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
218 )),
219 ])),
220 index::bitmap::BitmapType::Roaring,
221 )
222 .await
223 .unwrap();
224 writer
225 .add_index(
226 "tag1".to_string(),
227 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
228 Box::new(stream::iter(vec![
229 Ok((
230 Bytes::from("x"),
231 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
232 )),
233 Ok((
234 Bytes::from("y"),
235 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
236 )),
237 Ok((
238 Bytes::from("z"),
239 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
240 )),
241 ])),
242 index::bitmap::BitmapType::Roaring,
243 )
244 .await
245 .unwrap();
246 writer
247 .finish(8, NonZeroUsize::new(1).unwrap())
248 .await
249 .unwrap();
250
251 blob
252 }
253
254 #[tokio::test]
255 async fn test_inverted_index_cache() {
256 let blob = create_inverted_index_blob().await;
257
258 let mut env = TestEnv::new().await;
260 let file_size = blob.len() as u64;
261 let store = env.init_object_store_manager();
262 let temp_path = "data";
263 store.write(temp_path, blob).await.unwrap();
264 let store = InstrumentedStore::new(store);
265 let metric =
266 register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap();
267 let counter = metric.with_label_values(&["test"]);
268 let range_reader = store
269 .range_reader("data", &counter, &counter)
270 .await
271 .unwrap();
272
273 let reader = InvertedIndexBlobReader::new(range_reader);
274 let cached_reader = CachedInvertedIndexBlobReader::new(
275 FileId::random(),
276 file_size,
277 reader,
278 Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
279 );
280 let metadata = cached_reader.metadata().await.unwrap();
281 assert_eq!(metadata.total_row_count, 8);
282 assert_eq!(metadata.segment_row_count, 1);
283 assert_eq!(metadata.metas.len(), 2);
284 let tag0 = metadata.metas.get("tag0").unwrap();
286 let stats0 = tag0.stats.as_ref().unwrap();
287 assert_eq!(stats0.distinct_count, 3);
288 assert_eq!(stats0.null_count, 1);
289 assert_eq!(stats0.min_value, Bytes::from("a"));
290 assert_eq!(stats0.max_value, Bytes::from("c"));
291 let fst0 = cached_reader
292 .fst(
293 tag0.base_offset + tag0.relative_fst_offset as u64,
294 tag0.fst_size,
295 )
296 .await
297 .unwrap();
298 assert_eq!(fst0.len(), 3);
299 let [offset, size] = unpack(fst0.get(b"a").unwrap());
300 let bitmap = cached_reader
301 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
302 .await
303 .unwrap();
304 assert_eq!(
305 bitmap,
306 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
307 );
308 let [offset, size] = unpack(fst0.get(b"b").unwrap());
309 let bitmap = cached_reader
310 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
311 .await
312 .unwrap();
313 assert_eq!(
314 bitmap,
315 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
316 );
317 let [offset, size] = unpack(fst0.get(b"c").unwrap());
318 let bitmap = cached_reader
319 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
320 .await
321 .unwrap();
322 assert_eq!(
323 bitmap,
324 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
325 );
326
327 let tag1 = metadata.metas.get("tag1").unwrap();
329 let stats1 = tag1.stats.as_ref().unwrap();
330 assert_eq!(stats1.distinct_count, 3);
331 assert_eq!(stats1.null_count, 1);
332 assert_eq!(stats1.min_value, Bytes::from("x"));
333 assert_eq!(stats1.max_value, Bytes::from("z"));
334 let fst1 = cached_reader
335 .fst(
336 tag1.base_offset + tag1.relative_fst_offset as u64,
337 tag1.fst_size,
338 )
339 .await
340 .unwrap();
341 assert_eq!(fst1.len(), 3);
342 let [offset, size] = unpack(fst1.get(b"x").unwrap());
343 let bitmap = cached_reader
344 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
345 .await
346 .unwrap();
347 assert_eq!(
348 bitmap,
349 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
350 );
351 let [offset, size] = unpack(fst1.get(b"y").unwrap());
352 let bitmap = cached_reader
353 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
354 .await
355 .unwrap();
356 assert_eq!(
357 bitmap,
358 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
359 );
360 let [offset, size] = unpack(fst1.get(b"z").unwrap());
361 let bitmap = cached_reader
362 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
363 .await
364 .unwrap();
365 assert_eq!(
366 bitmap,
367 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
368 );
369
370 let mut rng = rand::rng();
372 for _ in 0..FUZZ_REPEAT_TIMES {
373 let offset = rng.random_range(0..file_size);
374 let size = rng.random_range(0..file_size as u32 - offset as u32);
375 let expected = cached_reader.range_read(offset, size).await.unwrap();
376 let inner = &cached_reader.inner;
377 let read = cached_reader
378 .cache
379 .get_or_load(
380 cached_reader.file_id,
381 file_size,
382 offset,
383 size,
384 |ranges| async move { inner.read_vec(&ranges).await },
385 )
386 .await
387 .unwrap();
388 assert_eq!(read, expected);
389 }
390 }
391}