1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use index::bloom_filter::applier::InListPredicate;
19use index::inverted_index::search::predicate::{Predicate, RangePredicate};
20use moka::notification::RemovalCause;
21use moka::sync::Cache;
22use store_api::storage::ColumnId;
23
24use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
25use crate::sst::file::FileId;
26use crate::sst::index::fulltext_index::applier::builder::{
27 FulltextQuery, FulltextRequest, FulltextTerm,
28};
29use crate::sst::parquet::row_selection::RowGroupSelection;
30
31const INDEX_RESULT_TYPE: &str = "index_result";
32
33pub struct IndexResultCache {
40 cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
41}
42
43impl IndexResultCache {
44 pub fn new(capacity: u64) -> Self {
46 fn to_str(cause: RemovalCause) -> &'static str {
47 match cause {
48 RemovalCause::Expired => "expired",
49 RemovalCause::Explicit => "explicit",
50 RemovalCause::Replaced => "replaced",
51 RemovalCause::Size => "size",
52 }
53 }
54
55 let cache = Cache::builder()
56 .max_capacity(capacity)
57 .weigher(Self::index_result_cache_weight)
58 .eviction_listener(|k, v, cause| {
59 let size = Self::index_result_cache_weight(&k, &v);
60 CACHE_BYTES
61 .with_label_values(&[INDEX_RESULT_TYPE])
62 .sub(size.into());
63 CACHE_EVICTION
64 .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
65 .inc();
66 })
67 .build();
68 Self { cache }
69 }
70
71 pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
75 let key = (key, file_id);
76 let size = Self::index_result_cache_weight(&key, &result);
77 CACHE_BYTES
78 .with_label_values(&[INDEX_RESULT_TYPE])
79 .add(size.into());
80 self.cache.insert(key, result);
81 }
82
83 pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
88 let res = self.cache.get(&(key.clone(), file_id));
89 if res.is_some() {
90 CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc();
91 } else {
92 CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc()
93 }
94 res
95 }
96
97 fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
99 k.0.mem_usage() as u32 + v.mem_usage() as u32
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Hash)]
105pub enum PredicateKey {
106 Fulltext(FulltextIndexKey),
108 Bloom(BloomFilterKey),
110 Inverted(InvertedIndexKey),
112}
113
114impl PredicateKey {
115 pub fn new_fulltext(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
117 Self::Fulltext(FulltextIndexKey::new(predicates))
118 }
119
120 pub fn new_bloom(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
122 Self::Bloom(BloomFilterKey::new(predicates))
123 }
124
125 pub fn new_inverted(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
127 Self::Inverted(InvertedIndexKey::new(predicates))
128 }
129
130 pub fn mem_usage(&self) -> usize {
132 match self {
133 Self::Fulltext(key) => key.mem_usage,
134 Self::Bloom(key) => key.mem_usage,
135 Self::Inverted(key) => key.mem_usage,
136 }
137 }
138}
139
140#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
142pub struct FulltextIndexKey {
143 predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>,
144 mem_usage: usize,
145}
146
147impl FulltextIndexKey {
148 pub fn new(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
151 let mem_usage = predicates
152 .values()
153 .map(|request| {
154 let query_size = request
155 .queries
156 .iter()
157 .map(|query| query.0.len() + size_of::<FulltextQuery>())
158 .sum::<usize>();
159 let term_size = request
160 .terms
161 .iter()
162 .map(|term| term.term.len() + size_of::<FulltextTerm>())
163 .sum::<usize>();
164 query_size + term_size
165 })
166 .sum();
167 Self {
168 predicates,
169 mem_usage,
170 }
171 }
172}
173
174#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
176pub struct BloomFilterKey {
177 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
178 mem_usage: usize,
179}
180
181impl BloomFilterKey {
182 pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
185 let mem_usage = predicates
186 .values()
187 .map(|predicates| {
188 predicates
189 .iter()
190 .map(|predicate| predicate.list.iter().map(|list| list.len()).sum::<usize>())
191 .sum::<usize>()
192 })
193 .sum();
194 Self {
195 predicates,
196 mem_usage,
197 }
198 }
199}
200
201#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
203pub struct InvertedIndexKey {
204 predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>,
205 mem_usage: usize,
206}
207
208impl InvertedIndexKey {
209 pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
212 let mem_usage = predicates
213 .values()
214 .map(|predicates| {
215 predicates
216 .iter()
217 .map(|predicate| match predicate {
218 Predicate::InList(predicate) => {
219 predicate.list.iter().map(|list| list.len()).sum::<usize>()
220 }
221 Predicate::Range(_) => size_of::<RangePredicate>(),
222 Predicate::RegexMatch(predicate) => predicate.pattern.len(),
223 })
224 .sum::<usize>()
225 })
226 .sum();
227
228 Self {
229 predicates,
230 mem_usage,
231 }
232 }
233}
234
235#[cfg(test)]
236#[allow(clippy::single_range_in_vec_init)]
237mod tests {
238 use std::collections::{BTreeMap, BTreeSet};
239 use std::sync::Arc;
240
241 use index::bloom_filter::applier::InListPredicate as BloomInListPredicate;
242 use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate};
243
244 use super::*;
245 use crate::sst::index::fulltext_index::applier::builder::{
246 FulltextQuery, FulltextRequest, FulltextTerm,
247 };
248 use crate::sst::parquet::row_selection::RowGroupSelection;
249
250 #[test]
251 fn test_cache_basic_operations() {
252 let cache = IndexResultCache::new(1000);
253 let file_id = FileId::random();
254
255 let predicates = BTreeMap::new();
257 let key = PredicateKey::new_fulltext(Arc::new(predicates));
258 let selection = Arc::new(RowGroupSelection::from_row_ids(
259 [1, 2, 3].into_iter().collect(),
260 1,
261 10,
262 ));
263
264 cache.put(key.clone(), file_id, selection.clone());
266 let retrieved = cache.get(&key, file_id);
267 assert!(retrieved.is_some());
268 assert_eq!(
269 retrieved.unwrap().as_ref().row_count(),
270 selection.as_ref().row_count()
271 );
272
273 let non_existent_file_id = FileId::random();
275 assert!(cache.get(&key, non_existent_file_id).is_none());
276 }
277
278 #[test]
279 fn test_cache_capacity_limit() {
280 let cache = IndexResultCache::new(100);
282 let file_id1 = FileId::random();
283 let file_id2 = FileId::random();
284
285 let mut predicates1 = BTreeMap::new();
287 let request1 = FulltextRequest {
288 queries: vec![
289 FulltextQuery(
290 "test query 1 with a very long string to ensure large weight".to_string(),
291 ),
292 FulltextQuery("another long query string".to_string()),
293 ],
294 terms: vec![],
295 };
296 predicates1.insert(1, request1);
297 let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
298 let selection1 = Arc::new(RowGroupSelection::default());
299
300 let mut predicates2 = BTreeMap::new();
301 let request2 = FulltextRequest {
302 queries: vec![
303 FulltextQuery(
304 "test query 2 with a very long string to ensure large weight".to_string(),
305 ),
306 FulltextQuery("another long query string".to_string()),
307 ],
308 terms: vec![],
309 };
310 predicates2.insert(1, request2);
311 let key2 = PredicateKey::new_fulltext(Arc::new(predicates2));
312 let selection2 = Arc::new(RowGroupSelection::default());
313
314 let weight1 =
316 IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1);
317 let weight2 =
318 IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2);
319 assert!(weight1 > 100);
320 assert!(weight2 > 100);
321
322 cache.put(key1.clone(), file_id1, selection1.clone());
324
325 let retrieved1 = cache.get(&key1, file_id1);
327 assert!(retrieved1.is_some());
328 assert_eq!(
329 retrieved1.unwrap().as_ref().row_count(),
330 selection1.as_ref().row_count()
331 );
332
333 cache.put(key2.clone(), file_id2, selection2.clone());
335
336 let retrieved2 = cache.get(&key2, file_id2);
338 assert!(retrieved2.is_some());
339 assert_eq!(
340 retrieved2.unwrap().as_ref().row_count(),
341 selection2.as_ref().row_count()
342 );
343
344 cache.cache.run_pending_tasks();
346 let retrieved1_after_eviction = cache.get(&key1, file_id1);
347 assert!(
348 retrieved1_after_eviction.is_none(),
349 "First key should have been evicted"
350 );
351 }
352
353 #[test]
354 fn test_index_result_cache_weight() {
355 let file_id = FileId::random();
356
357 let empty_predicates = BTreeMap::new();
359 let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates));
360 let empty_selection = Arc::new(RowGroupSelection::default());
361 let empty_weight = IndexResultCache::index_result_cache_weight(
362 &(empty_key.clone(), file_id),
363 &empty_selection,
364 );
365 assert_eq!(empty_weight, 0);
366 assert_eq!(
367 empty_weight,
368 empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32
369 );
370
371 let mut predicates1 = BTreeMap::new();
373 let request1 = FulltextRequest {
374 queries: vec![FulltextQuery("test query".to_string())],
375 terms: vec![FulltextTerm {
376 col_lowered: false,
377 term: "test term".to_string(),
378 }],
379 };
380 predicates1.insert(1, request1);
381 let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
382 let selection1 = Arc::new(RowGroupSelection::new(100, 250));
383 let weight1 =
384 IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1);
385 assert!(weight1 > 0);
386 assert_eq!(
387 weight1,
388 key1.mem_usage() as u32 + selection1.mem_usage() as u32
389 );
390
391 let mut predicates2 = BTreeMap::new();
393 let predicate2 = BloomInListPredicate {
394 list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]),
395 };
396 predicates2.insert(1, vec![predicate2]);
397 let key2 = PredicateKey::new_bloom(Arc::new(predicates2));
398 let selection2 = Arc::new(RowGroupSelection::from_row_ids(
399 [1, 2, 3].into_iter().collect(),
400 100,
401 1,
402 ));
403 let weight2 =
404 IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2);
405 assert!(weight2 > 0);
406 assert_eq!(
407 weight2,
408 key2.mem_usage() as u32 + selection2.mem_usage() as u32
409 );
410
411 let mut predicates3 = BTreeMap::new();
413 let predicate3 = Predicate::Range(RangePredicate {
414 range: Range {
415 lower: None,
416 upper: None,
417 },
418 });
419 predicates3.insert(1, vec![predicate3]);
420 let key3 = PredicateKey::new_inverted(Arc::new(predicates3));
421 let selection3 = Arc::new(RowGroupSelection::from_row_ranges(
422 vec![(0, vec![5..15])],
423 20,
424 ));
425 let weight3 =
426 IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3);
427 assert!(weight3 > 0);
428 assert_eq!(
429 weight3,
430 key3.mem_usage() as u32 + selection3.mem_usage() as u32
431 );
432 }
433}