1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use index::bloom_filter::applier::InListPredicate;
19use index::inverted_index::search::predicate::{Predicate, RangePredicate};
20use moka::notification::RemovalCause;
21use moka::sync::Cache;
22use store_api::storage::{ColumnId, FileId};
23
24use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
25use crate::sst::index::fulltext_index::applier::builder::{
26 FulltextQuery, FulltextRequest, FulltextTerm,
27};
28use crate::sst::parquet::row_selection::RowGroupSelection;
29
30const INDEX_RESULT_TYPE: &str = "index_result";
31
32pub struct IndexResultCache {
39 cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
40}
41
42impl IndexResultCache {
43 pub fn new(capacity: u64) -> Self {
45 fn to_str(cause: RemovalCause) -> &'static str {
46 match cause {
47 RemovalCause::Expired => "expired",
48 RemovalCause::Explicit => "explicit",
49 RemovalCause::Replaced => "replaced",
50 RemovalCause::Size => "size",
51 }
52 }
53
54 let cache = Cache::builder()
55 .max_capacity(capacity)
56 .weigher(Self::index_result_cache_weight)
57 .eviction_listener(|k, v, cause| {
58 let size = Self::index_result_cache_weight(&k, &v);
59 CACHE_BYTES
60 .with_label_values(&[INDEX_RESULT_TYPE])
61 .sub(size.into());
62 CACHE_EVICTION
63 .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
64 .inc();
65 })
66 .support_invalidation_closures()
67 .build();
68 Self { cache }
69 }
70
71 pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
75 let key = (key, file_id);
76 let size = Self::index_result_cache_weight(&key, &result);
77 CACHE_BYTES
78 .with_label_values(&[INDEX_RESULT_TYPE])
79 .add(size.into());
80 self.cache.insert(key, result);
81 }
82
83 pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
88 let res = self.cache.get(&(key.clone(), file_id));
89 if res.is_some() {
90 CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc();
91 } else {
92 CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc()
93 }
94 res
95 }
96
97 fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
99 k.0.mem_usage() as u32 + v.mem_usage() as u32
100 }
101
102 pub fn invalidate_file(&self, file_id: FileId) {
104 self.cache
105 .invalidate_entries_if(move |(_, cached_file_id), _| *cached_file_id == file_id)
106 .expect("cache should support invalidation closures");
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq, Hash)]
112pub enum PredicateKey {
113 Fulltext(FulltextIndexKey),
115 Bloom(BloomFilterKey),
117 Inverted(InvertedIndexKey),
119}
120
121impl PredicateKey {
122 pub fn new_fulltext(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
124 Self::Fulltext(FulltextIndexKey::new(predicates))
125 }
126
127 pub fn new_bloom(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
129 Self::Bloom(BloomFilterKey::new(predicates))
130 }
131
132 pub fn new_inverted(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
134 Self::Inverted(InvertedIndexKey::new(predicates))
135 }
136
137 pub fn mem_usage(&self) -> usize {
139 match self {
140 Self::Fulltext(key) => key.mem_usage,
141 Self::Bloom(key) => key.mem_usage,
142 Self::Inverted(key) => key.mem_usage,
143 }
144 }
145}
146
147#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
149pub struct FulltextIndexKey {
150 predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>,
151 mem_usage: usize,
152}
153
154impl FulltextIndexKey {
155 pub fn new(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
158 let mem_usage = predicates
159 .values()
160 .map(|request| {
161 let query_size = request
162 .queries
163 .iter()
164 .map(|query| query.0.len() + size_of::<FulltextQuery>())
165 .sum::<usize>();
166 let term_size = request
167 .terms
168 .iter()
169 .map(|term| term.term.len() + size_of::<FulltextTerm>())
170 .sum::<usize>();
171 query_size + term_size
172 })
173 .sum();
174 Self {
175 predicates,
176 mem_usage,
177 }
178 }
179}
180
181#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
183pub struct BloomFilterKey {
184 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
185 mem_usage: usize,
186}
187
188impl BloomFilterKey {
189 pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
192 let mem_usage = predicates
193 .values()
194 .map(|predicates| {
195 predicates
196 .iter()
197 .map(|predicate| predicate.list.iter().map(|list| list.len()).sum::<usize>())
198 .sum::<usize>()
199 })
200 .sum();
201 Self {
202 predicates,
203 mem_usage,
204 }
205 }
206}
207
208#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
210pub struct InvertedIndexKey {
211 predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>,
212 mem_usage: usize,
213}
214
215impl InvertedIndexKey {
216 pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
219 let mem_usage = predicates
220 .values()
221 .map(|predicates| {
222 predicates
223 .iter()
224 .map(|predicate| match predicate {
225 Predicate::InList(predicate) => {
226 predicate.list.iter().map(|list| list.len()).sum::<usize>()
227 }
228 Predicate::Range(_) => size_of::<RangePredicate>(),
229 Predicate::RegexMatch(predicate) => predicate.pattern.len(),
230 })
231 .sum::<usize>()
232 })
233 .sum();
234
235 Self {
236 predicates,
237 mem_usage,
238 }
239 }
240}
241
242#[cfg(test)]
243#[allow(clippy::single_range_in_vec_init)]
244mod tests {
245 use std::collections::{BTreeMap, BTreeSet};
246 use std::sync::Arc;
247
248 use index::bloom_filter::applier::InListPredicate as BloomInListPredicate;
249 use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate};
250
251 use super::*;
252 use crate::sst::index::fulltext_index::applier::builder::{
253 FulltextQuery, FulltextRequest, FulltextTerm,
254 };
255 use crate::sst::parquet::row_selection::RowGroupSelection;
256
257 #[test]
258 fn test_cache_basic_operations() {
259 let cache = IndexResultCache::new(1000);
260 let file_id = FileId::random();
261
262 let predicates = BTreeMap::new();
264 let key = PredicateKey::new_fulltext(Arc::new(predicates));
265 let selection = Arc::new(RowGroupSelection::from_row_ids(
266 [1, 2, 3].into_iter().collect(),
267 1,
268 10,
269 ));
270
271 cache.put(key.clone(), file_id, selection.clone());
273 let retrieved = cache.get(&key, file_id);
274 assert!(retrieved.is_some());
275 assert_eq!(
276 retrieved.unwrap().as_ref().row_count(),
277 selection.as_ref().row_count()
278 );
279
280 let non_existent_file_id = FileId::random();
282 assert!(cache.get(&key, non_existent_file_id).is_none());
283 }
284
285 #[test]
286 fn test_cache_capacity_limit() {
287 let cache = IndexResultCache::new(100);
289 let file_id1 = FileId::random();
290 let file_id2 = FileId::random();
291
292 let mut predicates1 = BTreeMap::new();
294 let request1 = FulltextRequest {
295 queries: vec![
296 FulltextQuery(
297 "test query 1 with a very long string to ensure large weight".to_string(),
298 ),
299 FulltextQuery("another long query string".to_string()),
300 ],
301 terms: vec![],
302 };
303 predicates1.insert(1, request1);
304 let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
305 let selection1 = Arc::new(RowGroupSelection::default());
306
307 let mut predicates2 = BTreeMap::new();
308 let request2 = FulltextRequest {
309 queries: vec![
310 FulltextQuery(
311 "test query 2 with a very long string to ensure large weight".to_string(),
312 ),
313 FulltextQuery("another long query string".to_string()),
314 ],
315 terms: vec![],
316 };
317 predicates2.insert(1, request2);
318 let key2 = PredicateKey::new_fulltext(Arc::new(predicates2));
319 let selection2 = Arc::new(RowGroupSelection::default());
320
321 let weight1 =
323 IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1);
324 let weight2 =
325 IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2);
326 assert!(weight1 > 100);
327 assert!(weight2 > 100);
328
329 cache.put(key1.clone(), file_id1, selection1.clone());
331
332 let retrieved1 = cache.get(&key1, file_id1);
334 assert!(retrieved1.is_some());
335 assert_eq!(
336 retrieved1.unwrap().as_ref().row_count(),
337 selection1.as_ref().row_count()
338 );
339
340 cache.put(key2.clone(), file_id2, selection2.clone());
342
343 let retrieved2 = cache.get(&key2, file_id2);
345 assert!(retrieved2.is_some());
346 assert_eq!(
347 retrieved2.unwrap().as_ref().row_count(),
348 selection2.as_ref().row_count()
349 );
350
351 cache.cache.run_pending_tasks();
353 let retrieved1_after_eviction = cache.get(&key1, file_id1);
354 assert!(
355 retrieved1_after_eviction.is_none(),
356 "First key should have been evicted"
357 );
358 }
359
360 #[test]
361 fn test_index_result_cache_weight() {
362 let file_id = FileId::random();
363
364 let empty_predicates = BTreeMap::new();
366 let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates));
367 let empty_selection = Arc::new(RowGroupSelection::default());
368 let empty_weight = IndexResultCache::index_result_cache_weight(
369 &(empty_key.clone(), file_id),
370 &empty_selection,
371 );
372 assert_eq!(empty_weight, 0);
373 assert_eq!(
374 empty_weight,
375 empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32
376 );
377
378 let mut predicates1 = BTreeMap::new();
380 let request1 = FulltextRequest {
381 queries: vec![FulltextQuery("test query".to_string())],
382 terms: vec![FulltextTerm {
383 col_lowered: false,
384 term: "test term".to_string(),
385 }],
386 };
387 predicates1.insert(1, request1);
388 let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
389 let selection1 = Arc::new(RowGroupSelection::new(100, 250));
390 let weight1 =
391 IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1);
392 assert!(weight1 > 0);
393 assert_eq!(
394 weight1,
395 key1.mem_usage() as u32 + selection1.mem_usage() as u32
396 );
397
398 let mut predicates2 = BTreeMap::new();
400 let predicate2 = BloomInListPredicate {
401 list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]),
402 };
403 predicates2.insert(1, vec![predicate2]);
404 let key2 = PredicateKey::new_bloom(Arc::new(predicates2));
405 let selection2 = Arc::new(RowGroupSelection::from_row_ids(
406 [1, 2, 3].into_iter().collect(),
407 100,
408 1,
409 ));
410 let weight2 =
411 IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2);
412 assert!(weight2 > 0);
413 assert_eq!(
414 weight2,
415 key2.mem_usage() as u32 + selection2.mem_usage() as u32
416 );
417
418 let mut predicates3 = BTreeMap::new();
420 let predicate3 = Predicate::Range(RangePredicate {
421 range: Range {
422 lower: None,
423 upper: None,
424 },
425 });
426 predicates3.insert(1, vec![predicate3]);
427 let key3 = PredicateKey::new_inverted(Arc::new(predicates3));
428 let selection3 = Arc::new(RowGroupSelection::from_row_ranges(
429 vec![(0, vec![5..15])],
430 20,
431 ));
432 let weight3 =
433 IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3);
434 assert!(weight3 > 0);
435 assert_eq!(
436 weight3,
437 key3.mem_usage() as u32 + selection3.mem_usage() as u32
438 );
439 }
440}