1use core::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::InvertedIndexMetas;
19use async_trait::async_trait;
20use bytes::Bytes;
21use index::inverted_index::error::Result;
22use index::inverted_index::format::reader::InvertedIndexReader;
23use prost::Message;
24use store_api::storage::FileId;
25
26use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
27use crate::metrics::{CACHE_HIT, CACHE_MISS};
28
29const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
30
31pub type InvertedIndexCache = IndexCache<FileId, InvertedIndexMetas>;
33pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
34
35impl InvertedIndexCache {
36 pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
38 Self::new_with_weighter(
39 index_metadata_cap,
40 index_content_cap,
41 page_size,
42 INDEX_TYPE_INVERTED_INDEX,
43 inverted_index_metadata_weight,
44 inverted_index_content_weight,
45 )
46 }
47}
48
49fn inverted_index_metadata_weight(k: &FileId, v: &Arc<InvertedIndexMetas>) -> u32 {
51 (k.as_bytes().len() + v.encoded_len()) as u32
52}
53
54fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
56 (k.as_bytes().len() + v.len()) as u32
57}
58
59pub struct CachedInvertedIndexBlobReader<R> {
61 file_id: FileId,
62 blob_size: u64,
63 inner: R,
64 cache: InvertedIndexCacheRef,
65}
66
67impl<R> CachedInvertedIndexBlobReader<R> {
68 pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
70 Self {
71 file_id,
72 blob_size,
73 inner,
74 cache,
75 }
76 }
77}
78
79#[async_trait]
80impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
81 async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
82 let inner = &self.inner;
83 self.cache
84 .get_or_load(
85 self.file_id,
86 self.blob_size,
87 offset,
88 size,
89 move |ranges| async move { inner.read_vec(&ranges).await },
90 )
91 .await
92 }
93
94 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
95 let mut pages = Vec::with_capacity(ranges.len());
96 for range in ranges {
97 let inner = &self.inner;
98 let page = self
99 .cache
100 .get_or_load(
101 self.file_id,
102 self.blob_size,
103 range.start,
104 (range.end - range.start) as u32,
105 move |ranges| async move { inner.read_vec(&ranges).await },
106 )
107 .await?;
108
109 pages.push(Bytes::from(page));
110 }
111
112 Ok(pages)
113 }
114
115 async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
116 if let Some(cached) = self.cache.get_metadata(self.file_id) {
117 CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
118 Ok(cached)
119 } else {
120 let meta = self.inner.metadata().await?;
121 self.cache.put_metadata(self.file_id, meta.clone());
122 CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
123 Ok(meta)
124 }
125 }
126}
127
128#[cfg(test)]
129mod test {
130 use std::num::NonZeroUsize;
131
132 use futures::stream;
133 use index::Bytes;
134 use index::bitmap::{Bitmap, BitmapType};
135 use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
136 use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
137 use prometheus::register_int_counter_vec;
138 use rand::{Rng, RngCore};
139
140 use super::*;
141 use crate::sst::index::store::InstrumentedStore;
142 use crate::test_util::TestEnv;
143
144 const FUZZ_REPEAT_TIMES: usize = 100;
146
147 #[test]
149 fn fuzz_index_calculation() {
150 let mut rng = rand::rng();
152 let mut data = vec![0u8; 1024 * 1024];
153 rng.fill_bytes(&mut data);
154
155 for _ in 0..FUZZ_REPEAT_TIMES {
156 let offset = rng.random_range(0..data.len() as u64);
157 let size = rng.random_range(0..data.len() as u32 - offset as u32);
158 let page_size: usize = rng.random_range(1..1024);
159
160 let indexes =
161 PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
162 let page_num = indexes.len();
163 let mut read = Vec::with_capacity(size as usize);
164 for key in indexes.into_iter() {
165 let start = key.page_id as usize * page_size;
166 let page = if start + page_size < data.len() {
167 &data[start..start + page_size]
168 } else {
169 &data[start..]
170 };
171 read.extend_from_slice(page);
172 }
173 let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
174 let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
175 if read != data.get(expected_range).unwrap() {
176 panic!(
177 "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
178 offset,
179 size,
180 page_size,
181 read.len(),
182 size as usize,
183 PageKey::calculate_range(offset, size, page_size as u64),
184 page_num
185 );
186 }
187 }
188 }
189
190 fn unpack(fst_value: u64) -> [u32; 2] {
191 bytemuck::cast::<u64, [u32; 2]>(fst_value)
192 }
193
194 async fn create_inverted_index_blob() -> Vec<u8> {
195 let mut blob = Vec::new();
196 let mut writer = InvertedIndexBlobWriter::new(&mut blob);
197 writer
198 .add_index(
199 "tag0".to_string(),
200 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
201 Box::new(stream::iter(vec![
202 Ok((
203 Bytes::from("a"),
204 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
205 )),
206 Ok((
207 Bytes::from("b"),
208 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
209 )),
210 Ok((
211 Bytes::from("c"),
212 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
213 )),
214 ])),
215 index::bitmap::BitmapType::Roaring,
216 )
217 .await
218 .unwrap();
219 writer
220 .add_index(
221 "tag1".to_string(),
222 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
223 Box::new(stream::iter(vec![
224 Ok((
225 Bytes::from("x"),
226 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
227 )),
228 Ok((
229 Bytes::from("y"),
230 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
231 )),
232 Ok((
233 Bytes::from("z"),
234 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
235 )),
236 ])),
237 index::bitmap::BitmapType::Roaring,
238 )
239 .await
240 .unwrap();
241 writer
242 .finish(8, NonZeroUsize::new(1).unwrap())
243 .await
244 .unwrap();
245
246 blob
247 }
248
249 #[tokio::test]
250 async fn test_inverted_index_cache() {
251 let blob = create_inverted_index_blob().await;
252
253 let mut env = TestEnv::new().await;
255 let file_size = blob.len() as u64;
256 let store = env.init_object_store_manager();
257 let temp_path = "data";
258 store.write(temp_path, blob).await.unwrap();
259 let store = InstrumentedStore::new(store);
260 let metric =
261 register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap();
262 let counter = metric.with_label_values(&["test"]);
263 let range_reader = store
264 .range_reader("data", &counter, &counter)
265 .await
266 .unwrap();
267
268 let reader = InvertedIndexBlobReader::new(range_reader);
269 let cached_reader = CachedInvertedIndexBlobReader::new(
270 FileId::random(),
271 file_size,
272 reader,
273 Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
274 );
275 let metadata = cached_reader.metadata().await.unwrap();
276 assert_eq!(metadata.total_row_count, 8);
277 assert_eq!(metadata.segment_row_count, 1);
278 assert_eq!(metadata.metas.len(), 2);
279 let tag0 = metadata.metas.get("tag0").unwrap();
281 let stats0 = tag0.stats.as_ref().unwrap();
282 assert_eq!(stats0.distinct_count, 3);
283 assert_eq!(stats0.null_count, 1);
284 assert_eq!(stats0.min_value, Bytes::from("a"));
285 assert_eq!(stats0.max_value, Bytes::from("c"));
286 let fst0 = cached_reader
287 .fst(
288 tag0.base_offset + tag0.relative_fst_offset as u64,
289 tag0.fst_size,
290 )
291 .await
292 .unwrap();
293 assert_eq!(fst0.len(), 3);
294 let [offset, size] = unpack(fst0.get(b"a").unwrap());
295 let bitmap = cached_reader
296 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
297 .await
298 .unwrap();
299 assert_eq!(
300 bitmap,
301 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
302 );
303 let [offset, size] = unpack(fst0.get(b"b").unwrap());
304 let bitmap = cached_reader
305 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
306 .await
307 .unwrap();
308 assert_eq!(
309 bitmap,
310 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
311 );
312 let [offset, size] = unpack(fst0.get(b"c").unwrap());
313 let bitmap = cached_reader
314 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
315 .await
316 .unwrap();
317 assert_eq!(
318 bitmap,
319 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
320 );
321
322 let tag1 = metadata.metas.get("tag1").unwrap();
324 let stats1 = tag1.stats.as_ref().unwrap();
325 assert_eq!(stats1.distinct_count, 3);
326 assert_eq!(stats1.null_count, 1);
327 assert_eq!(stats1.min_value, Bytes::from("x"));
328 assert_eq!(stats1.max_value, Bytes::from("z"));
329 let fst1 = cached_reader
330 .fst(
331 tag1.base_offset + tag1.relative_fst_offset as u64,
332 tag1.fst_size,
333 )
334 .await
335 .unwrap();
336 assert_eq!(fst1.len(), 3);
337 let [offset, size] = unpack(fst1.get(b"x").unwrap());
338 let bitmap = cached_reader
339 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
340 .await
341 .unwrap();
342 assert_eq!(
343 bitmap,
344 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
345 );
346 let [offset, size] = unpack(fst1.get(b"y").unwrap());
347 let bitmap = cached_reader
348 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
349 .await
350 .unwrap();
351 assert_eq!(
352 bitmap,
353 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
354 );
355 let [offset, size] = unpack(fst1.get(b"z").unwrap());
356 let bitmap = cached_reader
357 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
358 .await
359 .unwrap();
360 assert_eq!(
361 bitmap,
362 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
363 );
364
365 let mut rng = rand::rng();
367 for _ in 0..FUZZ_REPEAT_TIMES {
368 let offset = rng.random_range(0..file_size);
369 let size = rng.random_range(0..file_size as u32 - offset as u32);
370 let expected = cached_reader.range_read(offset, size).await.unwrap();
371 let inner = &cached_reader.inner;
372 let read = cached_reader
373 .cache
374 .get_or_load(
375 cached_reader.file_id,
376 file_size,
377 offset,
378 size,
379 |ranges| async move { inner.read_vec(&ranges).await },
380 )
381 .await
382 .unwrap();
383 assert_eq!(read, expected);
384 }
385 }
386}