1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use index::bloom_filter::applier::InListPredicate;
19use index::inverted_index::search::predicate::{Predicate, RangePredicate};
20use moka::notification::RemovalCause;
21use moka::sync::Cache;
22use store_api::storage::{ColumnId, FileId};
23
24use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
25use crate::sst::index::fulltext_index::applier::builder::{
26 FulltextQuery, FulltextRequest, FulltextTerm,
27};
28use crate::sst::parquet::row_selection::RowGroupSelection;
29
30const INDEX_RESULT_TYPE: &str = "index_result";
31
32pub struct IndexResultCache {
39 cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
40}
41
42impl IndexResultCache {
43 pub fn new(capacity: u64) -> Self {
45 fn to_str(cause: RemovalCause) -> &'static str {
46 match cause {
47 RemovalCause::Expired => "expired",
48 RemovalCause::Explicit => "explicit",
49 RemovalCause::Replaced => "replaced",
50 RemovalCause::Size => "size",
51 }
52 }
53
54 let cache = Cache::builder()
55 .max_capacity(capacity)
56 .weigher(Self::index_result_cache_weight)
57 .eviction_listener(|k, v, cause| {
58 let size = Self::index_result_cache_weight(&k, &v);
59 CACHE_BYTES
60 .with_label_values(&[INDEX_RESULT_TYPE])
61 .sub(size.into());
62 CACHE_EVICTION
63 .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
64 .inc();
65 })
66 .build();
67 Self { cache }
68 }
69
70 pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
74 let key = (key, file_id);
75 let size = Self::index_result_cache_weight(&key, &result);
76 CACHE_BYTES
77 .with_label_values(&[INDEX_RESULT_TYPE])
78 .add(size.into());
79 self.cache.insert(key, result);
80 }
81
82 pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
87 let res = self.cache.get(&(key.clone(), file_id));
88 if res.is_some() {
89 CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc();
90 } else {
91 CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc()
92 }
93 res
94 }
95
96 fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
98 k.0.mem_usage() as u32 + v.mem_usage() as u32
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Hash)]
104pub enum PredicateKey {
105 Fulltext(FulltextIndexKey),
107 Bloom(BloomFilterKey),
109 Inverted(InvertedIndexKey),
111}
112
113impl PredicateKey {
114 pub fn new_fulltext(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
116 Self::Fulltext(FulltextIndexKey::new(predicates))
117 }
118
119 pub fn new_bloom(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
121 Self::Bloom(BloomFilterKey::new(predicates))
122 }
123
124 pub fn new_inverted(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
126 Self::Inverted(InvertedIndexKey::new(predicates))
127 }
128
129 pub fn mem_usage(&self) -> usize {
131 match self {
132 Self::Fulltext(key) => key.mem_usage,
133 Self::Bloom(key) => key.mem_usage,
134 Self::Inverted(key) => key.mem_usage,
135 }
136 }
137}
138
139#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
141pub struct FulltextIndexKey {
142 predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>,
143 mem_usage: usize,
144}
145
146impl FulltextIndexKey {
147 pub fn new(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
150 let mem_usage = predicates
151 .values()
152 .map(|request| {
153 let query_size = request
154 .queries
155 .iter()
156 .map(|query| query.0.len() + size_of::<FulltextQuery>())
157 .sum::<usize>();
158 let term_size = request
159 .terms
160 .iter()
161 .map(|term| term.term.len() + size_of::<FulltextTerm>())
162 .sum::<usize>();
163 query_size + term_size
164 })
165 .sum();
166 Self {
167 predicates,
168 mem_usage,
169 }
170 }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
175pub struct BloomFilterKey {
176 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
177 mem_usage: usize,
178}
179
180impl BloomFilterKey {
181 pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
184 let mem_usage = predicates
185 .values()
186 .map(|predicates| {
187 predicates
188 .iter()
189 .map(|predicate| predicate.list.iter().map(|list| list.len()).sum::<usize>())
190 .sum::<usize>()
191 })
192 .sum();
193 Self {
194 predicates,
195 mem_usage,
196 }
197 }
198}
199
200#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
202pub struct InvertedIndexKey {
203 predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>,
204 mem_usage: usize,
205}
206
207impl InvertedIndexKey {
208 pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
211 let mem_usage = predicates
212 .values()
213 .map(|predicates| {
214 predicates
215 .iter()
216 .map(|predicate| match predicate {
217 Predicate::InList(predicate) => {
218 predicate.list.iter().map(|list| list.len()).sum::<usize>()
219 }
220 Predicate::Range(_) => size_of::<RangePredicate>(),
221 Predicate::RegexMatch(predicate) => predicate.pattern.len(),
222 })
223 .sum::<usize>()
224 })
225 .sum();
226
227 Self {
228 predicates,
229 mem_usage,
230 }
231 }
232}
233
234#[cfg(test)]
235#[allow(clippy::single_range_in_vec_init)]
236mod tests {
237 use std::collections::{BTreeMap, BTreeSet};
238 use std::sync::Arc;
239
240 use index::bloom_filter::applier::InListPredicate as BloomInListPredicate;
241 use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate};
242
243 use super::*;
244 use crate::sst::index::fulltext_index::applier::builder::{
245 FulltextQuery, FulltextRequest, FulltextTerm,
246 };
247 use crate::sst::parquet::row_selection::RowGroupSelection;
248
249 #[test]
250 fn test_cache_basic_operations() {
251 let cache = IndexResultCache::new(1000);
252 let file_id = FileId::random();
253
254 let predicates = BTreeMap::new();
256 let key = PredicateKey::new_fulltext(Arc::new(predicates));
257 let selection = Arc::new(RowGroupSelection::from_row_ids(
258 [1, 2, 3].into_iter().collect(),
259 1,
260 10,
261 ));
262
263 cache.put(key.clone(), file_id, selection.clone());
265 let retrieved = cache.get(&key, file_id);
266 assert!(retrieved.is_some());
267 assert_eq!(
268 retrieved.unwrap().as_ref().row_count(),
269 selection.as_ref().row_count()
270 );
271
272 let non_existent_file_id = FileId::random();
274 assert!(cache.get(&key, non_existent_file_id).is_none());
275 }
276
277 #[test]
278 fn test_cache_capacity_limit() {
279 let cache = IndexResultCache::new(100);
281 let file_id1 = FileId::random();
282 let file_id2 = FileId::random();
283
284 let mut predicates1 = BTreeMap::new();
286 let request1 = FulltextRequest {
287 queries: vec![
288 FulltextQuery(
289 "test query 1 with a very long string to ensure large weight".to_string(),
290 ),
291 FulltextQuery("another long query string".to_string()),
292 ],
293 terms: vec![],
294 };
295 predicates1.insert(1, request1);
296 let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
297 let selection1 = Arc::new(RowGroupSelection::default());
298
299 let mut predicates2 = BTreeMap::new();
300 let request2 = FulltextRequest {
301 queries: vec![
302 FulltextQuery(
303 "test query 2 with a very long string to ensure large weight".to_string(),
304 ),
305 FulltextQuery("another long query string".to_string()),
306 ],
307 terms: vec![],
308 };
309 predicates2.insert(1, request2);
310 let key2 = PredicateKey::new_fulltext(Arc::new(predicates2));
311 let selection2 = Arc::new(RowGroupSelection::default());
312
313 let weight1 =
315 IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1);
316 let weight2 =
317 IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2);
318 assert!(weight1 > 100);
319 assert!(weight2 > 100);
320
321 cache.put(key1.clone(), file_id1, selection1.clone());
323
324 let retrieved1 = cache.get(&key1, file_id1);
326 assert!(retrieved1.is_some());
327 assert_eq!(
328 retrieved1.unwrap().as_ref().row_count(),
329 selection1.as_ref().row_count()
330 );
331
332 cache.put(key2.clone(), file_id2, selection2.clone());
334
335 let retrieved2 = cache.get(&key2, file_id2);
337 assert!(retrieved2.is_some());
338 assert_eq!(
339 retrieved2.unwrap().as_ref().row_count(),
340 selection2.as_ref().row_count()
341 );
342
343 cache.cache.run_pending_tasks();
345 let retrieved1_after_eviction = cache.get(&key1, file_id1);
346 assert!(
347 retrieved1_after_eviction.is_none(),
348 "First key should have been evicted"
349 );
350 }
351
352 #[test]
353 fn test_index_result_cache_weight() {
354 let file_id = FileId::random();
355
356 let empty_predicates = BTreeMap::new();
358 let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates));
359 let empty_selection = Arc::new(RowGroupSelection::default());
360 let empty_weight = IndexResultCache::index_result_cache_weight(
361 &(empty_key.clone(), file_id),
362 &empty_selection,
363 );
364 assert_eq!(empty_weight, 0);
365 assert_eq!(
366 empty_weight,
367 empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32
368 );
369
370 let mut predicates1 = BTreeMap::new();
372 let request1 = FulltextRequest {
373 queries: vec![FulltextQuery("test query".to_string())],
374 terms: vec![FulltextTerm {
375 col_lowered: false,
376 term: "test term".to_string(),
377 }],
378 };
379 predicates1.insert(1, request1);
380 let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
381 let selection1 = Arc::new(RowGroupSelection::new(100, 250));
382 let weight1 =
383 IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1);
384 assert!(weight1 > 0);
385 assert_eq!(
386 weight1,
387 key1.mem_usage() as u32 + selection1.mem_usage() as u32
388 );
389
390 let mut predicates2 = BTreeMap::new();
392 let predicate2 = BloomInListPredicate {
393 list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]),
394 };
395 predicates2.insert(1, vec![predicate2]);
396 let key2 = PredicateKey::new_bloom(Arc::new(predicates2));
397 let selection2 = Arc::new(RowGroupSelection::from_row_ids(
398 [1, 2, 3].into_iter().collect(),
399 100,
400 1,
401 ));
402 let weight2 =
403 IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2);
404 assert!(weight2 > 0);
405 assert_eq!(
406 weight2,
407 key2.mem_usage() as u32 + selection2.mem_usage() as u32
408 );
409
410 let mut predicates3 = BTreeMap::new();
412 let predicate3 = Predicate::Range(RangePredicate {
413 range: Range {
414 lower: None,
415 upper: None,
416 },
417 });
418 predicates3.insert(1, vec![predicate3]);
419 let key3 = PredicateKey::new_inverted(Arc::new(predicates3));
420 let selection3 = Arc::new(RowGroupSelection::from_row_ranges(
421 vec![(0, vec![5..15])],
422 20,
423 ));
424 let weight3 =
425 IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3);
426 assert!(weight3 > 0);
427 assert_eq!(
428 weight3,
429 key3.mem_usage() as u32 + selection3.mem_usage() as u32
430 );
431 }
432}