1use core::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::InvertedIndexMetas;
19use async_trait::async_trait;
20use bytes::Bytes;
21use futures::future::try_join_all;
22use index::inverted_index::error::Result;
23use index::inverted_index::format::reader::InvertedIndexReader;
24use prost::Message;
25
26use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE};
27use crate::metrics::{CACHE_HIT, CACHE_MISS};
28use crate::sst::file::FileId;
29
30const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
31
32pub type InvertedIndexCache = IndexCache<FileId, InvertedIndexMetas>;
34pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
35
36impl InvertedIndexCache {
37 pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
39 Self::new_with_weighter(
40 index_metadata_cap,
41 index_content_cap,
42 page_size,
43 INDEX_TYPE_INVERTED_INDEX,
44 inverted_index_metadata_weight,
45 inverted_index_content_weight,
46 )
47 }
48}
49
50fn inverted_index_metadata_weight(k: &FileId, v: &Arc<InvertedIndexMetas>) -> u32 {
52 (k.as_bytes().len() + v.encoded_len()) as u32
53}
54
55fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
57 (k.as_bytes().len() + v.len()) as u32
58}
59
60pub struct CachedInvertedIndexBlobReader<R> {
62 file_id: FileId,
63 blob_size: u64,
64 inner: R,
65 cache: InvertedIndexCacheRef,
66}
67
68impl<R> CachedInvertedIndexBlobReader<R> {
69 pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
71 Self {
72 file_id,
73 blob_size,
74 inner,
75 cache,
76 }
77 }
78}
79
80#[async_trait]
81impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
82 async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
83 let inner = &self.inner;
84 self.cache
85 .get_or_load(
86 self.file_id,
87 self.blob_size,
88 offset,
89 size,
90 move |ranges| async move { inner.read_vec(&ranges).await },
91 )
92 .await
93 }
94
95 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
96 let fetch = ranges.iter().map(|range| {
97 let inner = &self.inner;
98 self.cache.get_or_load(
99 self.file_id,
100 self.blob_size,
101 range.start,
102 (range.end - range.start) as u32,
103 move |ranges| async move { inner.read_vec(&ranges).await },
104 )
105 });
106 Ok(try_join_all(fetch)
107 .await?
108 .into_iter()
109 .map(Bytes::from)
110 .collect::<Vec<_>>())
111 }
112
113 async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
114 if let Some(cached) = self.cache.get_metadata(self.file_id) {
115 CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
116 Ok(cached)
117 } else {
118 let meta = self.inner.metadata().await?;
119 self.cache.put_metadata(self.file_id, meta.clone());
120 CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
121 Ok(meta)
122 }
123 }
124}
125
126#[cfg(test)]
127mod test {
128 use std::num::NonZeroUsize;
129
130 use futures::stream;
131 use index::bitmap::{Bitmap, BitmapType};
132 use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
133 use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
134 use index::Bytes;
135 use prometheus::register_int_counter_vec;
136 use rand::{Rng, RngCore};
137
138 use super::*;
139 use crate::sst::index::store::InstrumentedStore;
140 use crate::test_util::TestEnv;
141
142 const FUZZ_REPEAT_TIMES: usize = 100;
144
145 #[test]
147 fn fuzz_index_calculation() {
148 let mut rng = rand::rng();
150 let mut data = vec![0u8; 1024 * 1024];
151 rng.fill_bytes(&mut data);
152
153 for _ in 0..FUZZ_REPEAT_TIMES {
154 let offset = rng.random_range(0..data.len() as u64);
155 let size = rng.random_range(0..data.len() as u32 - offset as u32);
156 let page_size: usize = rng.random_range(1..1024);
157
158 let indexes =
159 PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
160 let page_num = indexes.len();
161 let mut read = Vec::with_capacity(size as usize);
162 for key in indexes.into_iter() {
163 let start = key.page_id as usize * page_size;
164 let page = if start + page_size < data.len() {
165 &data[start..start + page_size]
166 } else {
167 &data[start..]
168 };
169 read.extend_from_slice(page);
170 }
171 let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
172 let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
173 if read != data.get(expected_range).unwrap() {
174 panic!(
175 "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
176 offset, size, page_size, read.len(), size as usize,
177 PageKey::calculate_range(offset, size, page_size as u64),
178 page_num
179 );
180 }
181 }
182 }
183
184 fn unpack(fst_value: u64) -> [u32; 2] {
185 bytemuck::cast::<u64, [u32; 2]>(fst_value)
186 }
187
188 async fn create_inverted_index_blob() -> Vec<u8> {
189 let mut blob = Vec::new();
190 let mut writer = InvertedIndexBlobWriter::new(&mut blob);
191 writer
192 .add_index(
193 "tag0".to_string(),
194 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
195 Box::new(stream::iter(vec![
196 Ok((
197 Bytes::from("a"),
198 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
199 )),
200 Ok((
201 Bytes::from("b"),
202 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
203 )),
204 Ok((
205 Bytes::from("c"),
206 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
207 )),
208 ])),
209 index::bitmap::BitmapType::Roaring,
210 )
211 .await
212 .unwrap();
213 writer
214 .add_index(
215 "tag1".to_string(),
216 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
217 Box::new(stream::iter(vec![
218 Ok((
219 Bytes::from("x"),
220 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
221 )),
222 Ok((
223 Bytes::from("y"),
224 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
225 )),
226 Ok((
227 Bytes::from("z"),
228 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
229 )),
230 ])),
231 index::bitmap::BitmapType::Roaring,
232 )
233 .await
234 .unwrap();
235 writer
236 .finish(8, NonZeroUsize::new(1).unwrap())
237 .await
238 .unwrap();
239
240 blob
241 }
242
243 #[tokio::test]
244 async fn test_inverted_index_cache() {
245 let blob = create_inverted_index_blob().await;
246
247 let mut env = TestEnv::new();
249 let file_size = blob.len() as u64;
250 let store = env.init_object_store_manager();
251 let temp_path = "data";
252 store.write(temp_path, blob).await.unwrap();
253 let store = InstrumentedStore::new(store);
254 let metric =
255 register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap();
256 let counter = metric.with_label_values(&["test"]);
257 let range_reader = store
258 .range_reader("data", &counter, &counter)
259 .await
260 .unwrap();
261
262 let reader = InvertedIndexBlobReader::new(range_reader);
263 let cached_reader = CachedInvertedIndexBlobReader::new(
264 FileId::random(),
265 file_size,
266 reader,
267 Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
268 );
269 let metadata = cached_reader.metadata().await.unwrap();
270 assert_eq!(metadata.total_row_count, 8);
271 assert_eq!(metadata.segment_row_count, 1);
272 assert_eq!(metadata.metas.len(), 2);
273 let tag0 = metadata.metas.get("tag0").unwrap();
275 let stats0 = tag0.stats.as_ref().unwrap();
276 assert_eq!(stats0.distinct_count, 3);
277 assert_eq!(stats0.null_count, 1);
278 assert_eq!(stats0.min_value, Bytes::from("a"));
279 assert_eq!(stats0.max_value, Bytes::from("c"));
280 let fst0 = cached_reader
281 .fst(
282 tag0.base_offset + tag0.relative_fst_offset as u64,
283 tag0.fst_size,
284 )
285 .await
286 .unwrap();
287 assert_eq!(fst0.len(), 3);
288 let [offset, size] = unpack(fst0.get(b"a").unwrap());
289 let bitmap = cached_reader
290 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
291 .await
292 .unwrap();
293 assert_eq!(
294 bitmap,
295 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
296 );
297 let [offset, size] = unpack(fst0.get(b"b").unwrap());
298 let bitmap = cached_reader
299 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
300 .await
301 .unwrap();
302 assert_eq!(
303 bitmap,
304 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
305 );
306 let [offset, size] = unpack(fst0.get(b"c").unwrap());
307 let bitmap = cached_reader
308 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
309 .await
310 .unwrap();
311 assert_eq!(
312 bitmap,
313 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
314 );
315
316 let tag1 = metadata.metas.get("tag1").unwrap();
318 let stats1 = tag1.stats.as_ref().unwrap();
319 assert_eq!(stats1.distinct_count, 3);
320 assert_eq!(stats1.null_count, 1);
321 assert_eq!(stats1.min_value, Bytes::from("x"));
322 assert_eq!(stats1.max_value, Bytes::from("z"));
323 let fst1 = cached_reader
324 .fst(
325 tag1.base_offset + tag1.relative_fst_offset as u64,
326 tag1.fst_size,
327 )
328 .await
329 .unwrap();
330 assert_eq!(fst1.len(), 3);
331 let [offset, size] = unpack(fst1.get(b"x").unwrap());
332 let bitmap = cached_reader
333 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
334 .await
335 .unwrap();
336 assert_eq!(
337 bitmap,
338 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
339 );
340 let [offset, size] = unpack(fst1.get(b"y").unwrap());
341 let bitmap = cached_reader
342 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
343 .await
344 .unwrap();
345 assert_eq!(
346 bitmap,
347 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
348 );
349 let [offset, size] = unpack(fst1.get(b"z").unwrap());
350 let bitmap = cached_reader
351 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
352 .await
353 .unwrap();
354 assert_eq!(
355 bitmap,
356 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
357 );
358
359 let mut rng = rand::rng();
361 for _ in 0..FUZZ_REPEAT_TIMES {
362 let offset = rng.random_range(0..file_size);
363 let size = rng.random_range(0..file_size as u32 - offset as u32);
364 let expected = cached_reader.range_read(offset, size).await.unwrap();
365 let inner = &cached_reader.inner;
366 let read = cached_reader
367 .cache
368 .get_or_load(
369 cached_reader.file_id,
370 file_size,
371 offset,
372 size,
373 |ranges| async move { inner.read_vec(&ranges).await },
374 )
375 .await
376 .unwrap();
377 assert_eq!(read, expected);
378 }
379 }
380}