1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use index::bloom_filter::applier::InListPredicate;
19use index::inverted_index::search::predicate::{Predicate, RangePredicate};
20use moka::notification::RemovalCause;
21use moka::sync::Cache;
22use store_api::storage::{ColumnId, FileId};
23
24use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
25use crate::sst::index::fulltext_index::applier::builder::{
26 FulltextQuery, FulltextRequest, FulltextTerm,
27};
28use crate::sst::parquet::row_selection::RowGroupSelection;
29
30const INDEX_RESULT_TYPE: &str = "index_result";
31
32pub struct IndexResultCache {
39 cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
40}
41
42impl IndexResultCache {
43 pub fn new(capacity: u64) -> Self {
45 fn to_str(cause: RemovalCause) -> &'static str {
46 match cause {
47 RemovalCause::Expired => "expired",
48 RemovalCause::Explicit => "explicit",
49 RemovalCause::Replaced => "replaced",
50 RemovalCause::Size => "size",
51 }
52 }
53
54 let cache = Cache::builder()
55 .max_capacity(capacity)
56 .weigher(Self::index_result_cache_weight)
57 .eviction_listener(|k, v, cause| {
58 let size = Self::index_result_cache_weight(&k, &v);
59 CACHE_BYTES
60 .with_label_values(&[INDEX_RESULT_TYPE])
61 .sub(size.into());
62 CACHE_EVICTION
63 .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
64 .inc();
65 })
66 .support_invalidation_closures()
67 .build();
68 Self { cache }
69 }
70
71 pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
75 let key = (key, file_id);
76 let size = Self::index_result_cache_weight(&key, &result);
77 CACHE_BYTES
78 .with_label_values(&[INDEX_RESULT_TYPE])
79 .add(size.into());
80 self.cache.insert(key, result);
81 }
82
83 pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
88 let res = self.cache.get(&(key.clone(), file_id));
89 if res.is_some() {
90 CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc();
91 } else {
92 CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc()
93 }
94 res
95 }
96
97 fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
99 k.0.mem_usage() as u32 + v.mem_usage() as u32
100 }
101
102 pub fn invalidate_file(&self, file_id: FileId) {
104 self.cache
105 .invalidate_entries_if(move |(_, cached_file_id), _| *cached_file_id == file_id)
106 .expect("cache should support invalidation closures");
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq, Hash)]
112pub enum PredicateKey {
113 Fulltext(FulltextIndexKey),
115 Bloom(BloomFilterKey),
117 Inverted(InvertedIndexKey),
119 MinMax(MinMaxKey),
121}
122
123impl PredicateKey {
124 pub fn new_fulltext(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
126 Self::Fulltext(FulltextIndexKey::new(predicates))
127 }
128
129 pub fn new_bloom(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
131 Self::Bloom(BloomFilterKey::new(predicates))
132 }
133
134 pub fn new_inverted(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
136 Self::Inverted(InvertedIndexKey::new(predicates))
137 }
138
139 pub fn new_minmax(exprs: Arc<Vec<String>>, schema_version: u64, skip_fields: bool) -> Self {
141 Self::MinMax(MinMaxKey::new(exprs, schema_version, skip_fields))
142 }
143
144 pub fn mem_usage(&self) -> usize {
146 match self {
147 Self::Fulltext(key) => key.mem_usage,
148 Self::Bloom(key) => key.mem_usage,
149 Self::Inverted(key) => key.mem_usage,
150 Self::MinMax(key) => key.mem_usage,
151 }
152 }
153}
154
155#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
157pub struct FulltextIndexKey {
158 predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>,
159 mem_usage: usize,
160}
161
162impl FulltextIndexKey {
163 pub fn new(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
166 let mem_usage = predicates
167 .values()
168 .map(|request| {
169 let query_size = request
170 .queries
171 .iter()
172 .map(|query| query.0.len() + size_of::<FulltextQuery>())
173 .sum::<usize>();
174 let term_size = request
175 .terms
176 .iter()
177 .map(|term| term.term.len() + size_of::<FulltextTerm>())
178 .sum::<usize>();
179 query_size + term_size
180 })
181 .sum();
182 Self {
183 predicates,
184 mem_usage,
185 }
186 }
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
191pub struct BloomFilterKey {
192 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
193 mem_usage: usize,
194}
195
196impl BloomFilterKey {
197 pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
200 let mem_usage = predicates
201 .values()
202 .map(|predicates| {
203 predicates
204 .iter()
205 .map(|predicate| predicate.list.iter().map(|list| list.len()).sum::<usize>())
206 .sum::<usize>()
207 })
208 .sum();
209 Self {
210 predicates,
211 mem_usage,
212 }
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
218pub struct InvertedIndexKey {
219 predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>,
220 mem_usage: usize,
221}
222
223impl InvertedIndexKey {
224 pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
227 let mem_usage = predicates
228 .values()
229 .map(|predicates| {
230 predicates
231 .iter()
232 .map(|predicate| match predicate {
233 Predicate::InList(predicate) => {
234 predicate.list.iter().map(|list| list.len()).sum::<usize>()
235 }
236 Predicate::Range(_) => size_of::<RangePredicate>(),
237 Predicate::RegexMatch(predicate) => predicate.pattern.len(),
238 })
239 .sum::<usize>()
240 })
241 .sum();
242
243 Self {
244 predicates,
245 mem_usage,
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
252pub struct MinMaxKey {
253 exprs: Arc<Vec<String>>,
254 schema_version: u64,
255 skip_fields: bool,
256 mem_usage: usize,
257}
258
259impl MinMaxKey {
260 pub fn new(exprs: Arc<Vec<String>>, schema_version: u64, skip_fields: bool) -> Self {
261 let mem_usage = size_of::<Self>()
262 + size_of::<Vec<String>>()
263 + exprs.len() * size_of::<String>()
264 + exprs.iter().map(|s| s.len()).sum::<usize>();
265 Self {
266 exprs,
267 schema_version,
268 skip_fields,
269 mem_usage,
270 }
271 }
272}
273
274#[cfg(test)]
275#[allow(clippy::single_range_in_vec_init)]
276mod tests {
277 use std::collections::{BTreeMap, BTreeSet};
278 use std::sync::Arc;
279
280 use index::bloom_filter::applier::InListPredicate as BloomInListPredicate;
281 use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate};
282
283 use super::*;
284 use crate::sst::index::fulltext_index::applier::builder::{
285 FulltextQuery, FulltextRequest, FulltextTerm,
286 };
287 use crate::sst::parquet::row_selection::RowGroupSelection;
288
289 #[test]
290 fn test_cache_basic_operations() {
291 let cache = IndexResultCache::new(1000);
292 let file_id = FileId::random();
293
294 let predicates = BTreeMap::new();
296 let key = PredicateKey::new_fulltext(Arc::new(predicates));
297 let selection = Arc::new(RowGroupSelection::from_row_ids(
298 [1, 2, 3].into_iter().collect(),
299 1,
300 10,
301 ));
302
303 cache.put(key.clone(), file_id, selection.clone());
305 let retrieved = cache.get(&key, file_id);
306 assert!(retrieved.is_some());
307 assert_eq!(
308 retrieved.unwrap().as_ref().row_count(),
309 selection.as_ref().row_count()
310 );
311
312 let non_existent_file_id = FileId::random();
314 assert!(cache.get(&key, non_existent_file_id).is_none());
315 }
316
317 #[test]
318 fn test_minmax_key_should_distinguish_schema_version_and_skip_fields() {
319 let exprs = Arc::new(vec!["col > 1".to_string()]);
320
321 let key1 = PredicateKey::new_minmax(exprs.clone(), 1, false);
322 let key2 = PredicateKey::new_minmax(exprs.clone(), 2, false);
323 assert_ne!(key1, key2);
324
325 let key3 = PredicateKey::new_minmax(exprs, 1, true);
326 assert_ne!(key1, key3);
327 }
328
329 #[test]
330 fn test_cache_capacity_limit() {
331 let cache = IndexResultCache::new(100);
333 let file_id1 = FileId::random();
334 let file_id2 = FileId::random();
335
336 let mut predicates1 = BTreeMap::new();
338 let request1 = FulltextRequest {
339 queries: vec![
340 FulltextQuery(
341 "test query 1 with a very long string to ensure large weight".to_string(),
342 ),
343 FulltextQuery("another long query string".to_string()),
344 ],
345 terms: vec![],
346 };
347 predicates1.insert(1, request1);
348 let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
349 let selection1 = Arc::new(RowGroupSelection::default());
350
351 let mut predicates2 = BTreeMap::new();
352 let request2 = FulltextRequest {
353 queries: vec![
354 FulltextQuery(
355 "test query 2 with a very long string to ensure large weight".to_string(),
356 ),
357 FulltextQuery("another long query string".to_string()),
358 ],
359 terms: vec![],
360 };
361 predicates2.insert(1, request2);
362 let key2 = PredicateKey::new_fulltext(Arc::new(predicates2));
363 let selection2 = Arc::new(RowGroupSelection::default());
364
365 let weight1 =
367 IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1);
368 let weight2 =
369 IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2);
370 assert!(weight1 > 100);
371 assert!(weight2 > 100);
372
373 cache.put(key1.clone(), file_id1, selection1.clone());
375
376 let retrieved1 = cache.get(&key1, file_id1);
378 assert!(retrieved1.is_some());
379 assert_eq!(
380 retrieved1.unwrap().as_ref().row_count(),
381 selection1.as_ref().row_count()
382 );
383
384 cache.put(key2.clone(), file_id2, selection2.clone());
386
387 let retrieved2 = cache.get(&key2, file_id2);
389 assert!(retrieved2.is_some());
390 assert_eq!(
391 retrieved2.unwrap().as_ref().row_count(),
392 selection2.as_ref().row_count()
393 );
394
395 cache.cache.run_pending_tasks();
397 let retrieved1_after_eviction = cache.get(&key1, file_id1);
398 assert!(
399 retrieved1_after_eviction.is_none(),
400 "First key should have been evicted"
401 );
402 }
403
404 #[test]
405 fn test_index_result_cache_weight() {
406 let file_id = FileId::random();
407
408 let empty_predicates = BTreeMap::new();
410 let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates));
411 let empty_selection = Arc::new(RowGroupSelection::default());
412 let empty_weight = IndexResultCache::index_result_cache_weight(
413 &(empty_key.clone(), file_id),
414 &empty_selection,
415 );
416 assert_eq!(empty_weight, 0);
417 assert_eq!(
418 empty_weight,
419 empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32
420 );
421
422 let mut predicates1 = BTreeMap::new();
424 let request1 = FulltextRequest {
425 queries: vec![FulltextQuery("test query".to_string())],
426 terms: vec![FulltextTerm {
427 col_lowered: false,
428 term: "test term".to_string(),
429 }],
430 };
431 predicates1.insert(1, request1);
432 let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
433 let selection1 = Arc::new(RowGroupSelection::new(100, 250));
434 let weight1 =
435 IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1);
436 assert!(weight1 > 0);
437 assert_eq!(
438 weight1,
439 key1.mem_usage() as u32 + selection1.mem_usage() as u32
440 );
441
442 let mut predicates2 = BTreeMap::new();
444 let predicate2 = BloomInListPredicate {
445 list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]),
446 };
447 predicates2.insert(1, vec![predicate2]);
448 let key2 = PredicateKey::new_bloom(Arc::new(predicates2));
449 let selection2 = Arc::new(RowGroupSelection::from_row_ids(
450 [1, 2, 3].into_iter().collect(),
451 100,
452 1,
453 ));
454 let weight2 =
455 IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2);
456 assert!(weight2 > 0);
457 assert_eq!(
458 weight2,
459 key2.mem_usage() as u32 + selection2.mem_usage() as u32
460 );
461
462 let mut predicates3 = BTreeMap::new();
464 let predicate3 = Predicate::Range(RangePredicate {
465 range: Range {
466 lower: None,
467 upper: None,
468 },
469 });
470 predicates3.insert(1, vec![predicate3]);
471 let key3 = PredicateKey::new_inverted(Arc::new(predicates3));
472 let selection3 = Arc::new(RowGroupSelection::from_row_ranges(
473 vec![(0, vec![5..15])],
474 20,
475 ));
476 let weight3 =
477 IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3);
478 assert!(weight3 > 0);
479 assert_eq!(
480 weight3,
481 key3.mem_usage() as u32 + selection3.mem_usage() as u32
482 );
483 }
484}