1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use index::bloom_filter::applier::InListPredicate;
19use index::inverted_index::search::predicate::{Predicate, RangePredicate};
20use moka::notification::RemovalCause;
21use moka::sync::Cache;
22use store_api::storage::ColumnId;
23
24use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
25use crate::sst::file::FileId;
26use crate::sst::index::fulltext_index::applier::builder::{
27 FulltextQuery, FulltextRequest, FulltextTerm,
28};
29use crate::sst::parquet::row_selection::RowGroupSelection;
30
31const INDEX_RESULT_TYPE: &str = "index_result";
32
33pub struct IndexResultCache {
35 cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
36}
37
38impl IndexResultCache {
39 pub fn new(capacity: u64) -> Self {
41 fn to_str(cause: RemovalCause) -> &'static str {
42 match cause {
43 RemovalCause::Expired => "expired",
44 RemovalCause::Explicit => "explicit",
45 RemovalCause::Replaced => "replaced",
46 RemovalCause::Size => "size",
47 }
48 }
49
50 let cache = Cache::builder()
51 .max_capacity(capacity)
52 .weigher(Self::index_result_cache_weight)
53 .eviction_listener(|k, v, cause| {
54 let size = Self::index_result_cache_weight(&k, &v);
55 CACHE_BYTES
56 .with_label_values(&[INDEX_RESULT_TYPE])
57 .sub(size.into());
58 CACHE_EVICTION
59 .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
60 .inc();
61 })
62 .build();
63 Self { cache }
64 }
65
66 pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
68 let key = (key, file_id);
69 let size = Self::index_result_cache_weight(&key, &result);
70 CACHE_BYTES
71 .with_label_values(&[INDEX_RESULT_TYPE])
72 .add(size.into());
73 self.cache.insert(key, result);
74 }
75
76 pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
78 let res = self.cache.get(&(key.clone(), file_id));
79 if res.is_some() {
80 CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc();
81 } else {
82 CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc()
83 }
84 res
85 }
86
87 fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
89 k.0.mem_usage() as u32 + v.mem_usage() as u32
90 }
91}
92
93#[derive(Debug, Clone, PartialEq, Eq, Hash)]
95pub enum PredicateKey {
96 Fulltext(FulltextIndexKey),
98 Bloom(BloomFilterKey),
100 Inverted(InvertedIndexKey),
102}
103
104impl PredicateKey {
105 pub fn new_fulltext(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
107 Self::Fulltext(FulltextIndexKey::new(predicates))
108 }
109
110 pub fn new_bloom(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
112 Self::Bloom(BloomFilterKey::new(predicates))
113 }
114
115 pub fn new_inverted(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
117 Self::Inverted(InvertedIndexKey::new(predicates))
118 }
119
120 pub fn mem_usage(&self) -> usize {
122 match self {
123 Self::Fulltext(key) => key.mem_usage,
124 Self::Bloom(key) => key.mem_usage,
125 Self::Inverted(key) => key.mem_usage,
126 }
127 }
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
132pub struct FulltextIndexKey {
133 predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>,
134 mem_usage: usize,
135}
136
137impl FulltextIndexKey {
138 pub fn new(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
141 let mem_usage = predicates
142 .values()
143 .map(|request| {
144 let query_size = request
145 .queries
146 .iter()
147 .map(|query| query.0.len() + size_of::<FulltextQuery>())
148 .sum::<usize>();
149 let term_size = request
150 .terms
151 .iter()
152 .map(|term| term.term.len() + size_of::<FulltextTerm>())
153 .sum::<usize>();
154 query_size + term_size
155 })
156 .sum();
157 Self {
158 predicates,
159 mem_usage,
160 }
161 }
162}
163
164#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
166pub struct BloomFilterKey {
167 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
168 mem_usage: usize,
169}
170
171impl BloomFilterKey {
172 pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
175 let mem_usage = predicates
176 .values()
177 .map(|predicates| {
178 predicates
179 .iter()
180 .map(|predicate| predicate.list.iter().map(|list| list.len()).sum::<usize>())
181 .sum::<usize>()
182 })
183 .sum();
184 Self {
185 predicates,
186 mem_usage,
187 }
188 }
189}
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
193pub struct InvertedIndexKey {
194 predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>,
195 mem_usage: usize,
196}
197
198impl InvertedIndexKey {
199 pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
202 let mem_usage = predicates
203 .values()
204 .map(|predicates| {
205 predicates
206 .iter()
207 .map(|predicate| match predicate {
208 Predicate::InList(predicate) => {
209 predicate.list.iter().map(|list| list.len()).sum::<usize>()
210 }
211 Predicate::Range(_) => size_of::<RangePredicate>(),
212 Predicate::RegexMatch(predicate) => predicate.pattern.len(),
213 })
214 .sum::<usize>()
215 })
216 .sum();
217
218 Self {
219 predicates,
220 mem_usage,
221 }
222 }
223}
224
225#[cfg(test)]
226#[allow(clippy::single_range_in_vec_init)]
227mod tests {
228 use std::collections::{BTreeMap, BTreeSet};
229 use std::sync::Arc;
230
231 use index::bloom_filter::applier::InListPredicate as BloomInListPredicate;
232 use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate};
233
234 use super::*;
235 use crate::sst::index::fulltext_index::applier::builder::{
236 FulltextQuery, FulltextRequest, FulltextTerm,
237 };
238 use crate::sst::parquet::row_selection::RowGroupSelection;
239
240 #[test]
241 fn test_cache_basic_operations() {
242 let cache = IndexResultCache::new(1000);
243 let file_id = FileId::random();
244
245 let predicates = BTreeMap::new();
247 let key = PredicateKey::new_fulltext(Arc::new(predicates));
248 let selection = Arc::new(RowGroupSelection::from_row_ids(
249 [1, 2, 3].into_iter().collect(),
250 1,
251 10,
252 ));
253
254 cache.put(key.clone(), file_id, selection.clone());
256 let retrieved = cache.get(&key, file_id);
257 assert!(retrieved.is_some());
258 assert_eq!(
259 retrieved.unwrap().as_ref().row_count(),
260 selection.as_ref().row_count()
261 );
262
263 let non_existent_file_id = FileId::random();
265 assert!(cache.get(&key, non_existent_file_id).is_none());
266 }
267
268 #[test]
269 fn test_cache_capacity_limit() {
270 let cache = IndexResultCache::new(100);
272 let file_id1 = FileId::random();
273 let file_id2 = FileId::random();
274
275 let mut predicates1 = BTreeMap::new();
277 let request1 = FulltextRequest {
278 queries: vec![
279 FulltextQuery(
280 "test query 1 with a very long string to ensure large weight".to_string(),
281 ),
282 FulltextQuery("another long query string".to_string()),
283 ],
284 terms: vec![],
285 };
286 predicates1.insert(1, request1);
287 let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
288 let selection1 = Arc::new(RowGroupSelection::default());
289
290 let mut predicates2 = BTreeMap::new();
291 let request2 = FulltextRequest {
292 queries: vec![
293 FulltextQuery(
294 "test query 2 with a very long string to ensure large weight".to_string(),
295 ),
296 FulltextQuery("another long query string".to_string()),
297 ],
298 terms: vec![],
299 };
300 predicates2.insert(1, request2);
301 let key2 = PredicateKey::new_fulltext(Arc::new(predicates2));
302 let selection2 = Arc::new(RowGroupSelection::default());
303
304 let weight1 =
306 IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1);
307 let weight2 =
308 IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2);
309 assert!(weight1 > 100);
310 assert!(weight2 > 100);
311
312 cache.put(key1.clone(), file_id1, selection1.clone());
314
315 let retrieved1 = cache.get(&key1, file_id1);
317 assert!(retrieved1.is_some());
318 assert_eq!(
319 retrieved1.unwrap().as_ref().row_count(),
320 selection1.as_ref().row_count()
321 );
322
323 cache.put(key2.clone(), file_id2, selection2.clone());
325
326 let retrieved2 = cache.get(&key2, file_id2);
328 assert!(retrieved2.is_some());
329 assert_eq!(
330 retrieved2.unwrap().as_ref().row_count(),
331 selection2.as_ref().row_count()
332 );
333
334 cache.cache.run_pending_tasks();
336 let retrieved1_after_eviction = cache.get(&key1, file_id1);
337 assert!(
338 retrieved1_after_eviction.is_none(),
339 "First key should have been evicted"
340 );
341 }
342
343 #[test]
344 fn test_index_result_cache_weight() {
345 let file_id = FileId::random();
346
347 let empty_predicates = BTreeMap::new();
349 let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates));
350 let empty_selection = Arc::new(RowGroupSelection::default());
351 let empty_weight = IndexResultCache::index_result_cache_weight(
352 &(empty_key.clone(), file_id),
353 &empty_selection,
354 );
355 assert_eq!(empty_weight, 0);
356 assert_eq!(
357 empty_weight,
358 empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32
359 );
360
361 let mut predicates1 = BTreeMap::new();
363 let request1 = FulltextRequest {
364 queries: vec![FulltextQuery("test query".to_string())],
365 terms: vec![FulltextTerm {
366 col_lowered: false,
367 term: "test term".to_string(),
368 }],
369 };
370 predicates1.insert(1, request1);
371 let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
372 let selection1 = Arc::new(RowGroupSelection::new(100, 250));
373 let weight1 =
374 IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1);
375 assert!(weight1 > 0);
376 assert_eq!(
377 weight1,
378 key1.mem_usage() as u32 + selection1.mem_usage() as u32
379 );
380
381 let mut predicates2 = BTreeMap::new();
383 let predicate2 = BloomInListPredicate {
384 list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]),
385 };
386 predicates2.insert(1, vec![predicate2]);
387 let key2 = PredicateKey::new_bloom(Arc::new(predicates2));
388 let selection2 = Arc::new(RowGroupSelection::from_row_ids(
389 [1, 2, 3].into_iter().collect(),
390 100,
391 1,
392 ));
393 let weight2 =
394 IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2);
395 assert!(weight2 > 0);
396 assert_eq!(
397 weight2,
398 key2.mem_usage() as u32 + selection2.mem_usage() as u32
399 );
400
401 let mut predicates3 = BTreeMap::new();
403 let predicate3 = Predicate::Range(RangePredicate {
404 range: Range {
405 lower: None,
406 upper: None,
407 },
408 });
409 predicates3.insert(1, vec![predicate3]);
410 let key3 = PredicateKey::new_inverted(Arc::new(predicates3));
411 let selection3 = Arc::new(RowGroupSelection::from_row_ranges(
412 vec![(0, vec![5..15])],
413 20,
414 ));
415 let weight3 =
416 IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3);
417 assert!(weight3 > 0);
418 assert_eq!(
419 weight3,
420 key3.mem_usage() as u32 + selection3.mem_usage() as u32
421 );
422 }
423}