mito2/cache/index/
result_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use index::bloom_filter::applier::InListPredicate;
19use index::inverted_index::search::predicate::{Predicate, RangePredicate};
20use moka::notification::RemovalCause;
21use moka::sync::Cache;
22use store_api::storage::{ColumnId, FileId};
23
24use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
25use crate::sst::index::fulltext_index::applier::builder::{
26    FulltextQuery, FulltextRequest, FulltextTerm,
27};
28use crate::sst::parquet::row_selection::RowGroupSelection;
29
30const INDEX_RESULT_TYPE: &str = "index_result";
31
32/// Cache for storing index query results.
33///
34/// The `RowGroupSelection` is a collection of row groups that match the predicate.
35///
36/// Row groups can be partially searched. Row groups that not contained in `RowGroupSelection` are not searched.
37/// User can retrieve the partial results and handle uncontained row groups required by the predicate subsequently.
38pub struct IndexResultCache {
39    cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
40}
41
42impl IndexResultCache {
43    /// Creates a new cache with the given capacity.
44    pub fn new(capacity: u64) -> Self {
45        fn to_str(cause: RemovalCause) -> &'static str {
46            match cause {
47                RemovalCause::Expired => "expired",
48                RemovalCause::Explicit => "explicit",
49                RemovalCause::Replaced => "replaced",
50                RemovalCause::Size => "size",
51            }
52        }
53
54        let cache = Cache::builder()
55            .max_capacity(capacity)
56            .weigher(Self::index_result_cache_weight)
57            .eviction_listener(|k, v, cause| {
58                let size = Self::index_result_cache_weight(&k, &v);
59                CACHE_BYTES
60                    .with_label_values(&[INDEX_RESULT_TYPE])
61                    .sub(size.into());
62                CACHE_EVICTION
63                    .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
64                    .inc();
65            })
66            .support_invalidation_closures()
67            .build();
68        Self { cache }
69    }
70
71    /// Puts a query result into the cache.
72    ///
73    /// Allow user to put a partial result (not containing all row groups) into the cache.
74    pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
75        let key = (key, file_id);
76        let size = Self::index_result_cache_weight(&key, &result);
77        CACHE_BYTES
78            .with_label_values(&[INDEX_RESULT_TYPE])
79            .add(size.into());
80        self.cache.insert(key, result);
81    }
82
83    /// Gets a query result from the cache.
84    ///
85    /// Note: the returned `RowGroupSelection` only contains the row groups that are searched.
86    ///       Caller should handle the uncontained row groups required by the predicate subsequently.
87    pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
88        let res = self.cache.get(&(key.clone(), file_id));
89        if res.is_some() {
90            CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc();
91        } else {
92            CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc()
93        }
94        res
95    }
96
97    /// Calculates the memory usage of a cache entry.
98    fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
99        k.0.mem_usage() as u32 + v.mem_usage() as u32
100    }
101
102    /// Removes cached results for the given file.
103    pub fn invalidate_file(&self, file_id: FileId) {
104        self.cache
105            .invalidate_entries_if(move |(_, cached_file_id), _| *cached_file_id == file_id)
106            .expect("cache should support invalidation closures");
107    }
108}
109
110/// Key for different types of index predicates.
111#[derive(Debug, Clone, PartialEq, Eq, Hash)]
112pub enum PredicateKey {
113    /// Fulltext index predicate.
114    Fulltext(FulltextIndexKey),
115    /// Bloom filter predicate.
116    Bloom(BloomFilterKey),
117    /// Inverted index predicate.
118    Inverted(InvertedIndexKey),
119}
120
121impl PredicateKey {
122    /// Creates a new fulltext index key.
123    pub fn new_fulltext(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
124        Self::Fulltext(FulltextIndexKey::new(predicates))
125    }
126
127    /// Creates a new bloom filter key.
128    pub fn new_bloom(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
129        Self::Bloom(BloomFilterKey::new(predicates))
130    }
131
132    /// Creates a new inverted index key.
133    pub fn new_inverted(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
134        Self::Inverted(InvertedIndexKey::new(predicates))
135    }
136
137    /// Returns the memory usage of this key.
138    pub fn mem_usage(&self) -> usize {
139        match self {
140            Self::Fulltext(key) => key.mem_usage,
141            Self::Bloom(key) => key.mem_usage,
142            Self::Inverted(key) => key.mem_usage,
143        }
144    }
145}
146
147/// Key for fulltext index queries.
148#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
149pub struct FulltextIndexKey {
150    predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>,
151    mem_usage: usize,
152}
153
154impl FulltextIndexKey {
155    /// Creates a new fulltext index key with the given predicates.
156    /// Calculates memory usage based on the size of queries and terms.
157    pub fn new(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
158        let mem_usage = predicates
159            .values()
160            .map(|request| {
161                let query_size = request
162                    .queries
163                    .iter()
164                    .map(|query| query.0.len() + size_of::<FulltextQuery>())
165                    .sum::<usize>();
166                let term_size = request
167                    .terms
168                    .iter()
169                    .map(|term| term.term.len() + size_of::<FulltextTerm>())
170                    .sum::<usize>();
171                query_size + term_size
172            })
173            .sum();
174        Self {
175            predicates,
176            mem_usage,
177        }
178    }
179}
180
181/// Key for bloom filter queries.
182#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
183pub struct BloomFilterKey {
184    predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
185    mem_usage: usize,
186}
187
188impl BloomFilterKey {
189    /// Creates a new bloom filter key with the given predicates.
190    /// Calculates memory usage based on the size of predicate lists.
191    pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
192        let mem_usage = predicates
193            .values()
194            .map(|predicates| {
195                predicates
196                    .iter()
197                    .map(|predicate| predicate.list.iter().map(|list| list.len()).sum::<usize>())
198                    .sum::<usize>()
199            })
200            .sum();
201        Self {
202            predicates,
203            mem_usage,
204        }
205    }
206}
207
208/// Key for inverted index queries.
209#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
210pub struct InvertedIndexKey {
211    predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>,
212    mem_usage: usize,
213}
214
215impl InvertedIndexKey {
216    /// Creates a new inverted index key with the given predicates.
217    /// Calculates memory usage based on the type and size of predicates.
218    pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
219        let mem_usage = predicates
220            .values()
221            .map(|predicates| {
222                predicates
223                    .iter()
224                    .map(|predicate| match predicate {
225                        Predicate::InList(predicate) => {
226                            predicate.list.iter().map(|list| list.len()).sum::<usize>()
227                        }
228                        Predicate::Range(_) => size_of::<RangePredicate>(),
229                        Predicate::RegexMatch(predicate) => predicate.pattern.len(),
230                    })
231                    .sum::<usize>()
232            })
233            .sum();
234
235        Self {
236            predicates,
237            mem_usage,
238        }
239    }
240}
241
242#[cfg(test)]
243#[allow(clippy::single_range_in_vec_init)]
244mod tests {
245    use std::collections::{BTreeMap, BTreeSet};
246    use std::sync::Arc;
247
248    use index::bloom_filter::applier::InListPredicate as BloomInListPredicate;
249    use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate};
250
251    use super::*;
252    use crate::sst::index::fulltext_index::applier::builder::{
253        FulltextQuery, FulltextRequest, FulltextTerm,
254    };
255    use crate::sst::parquet::row_selection::RowGroupSelection;
256
257    #[test]
258    fn test_cache_basic_operations() {
259        let cache = IndexResultCache::new(1000);
260        let file_id = FileId::random();
261
262        // Create a test key and value
263        let predicates = BTreeMap::new();
264        let key = PredicateKey::new_fulltext(Arc::new(predicates));
265        let selection = Arc::new(RowGroupSelection::from_row_ids(
266            [1, 2, 3].into_iter().collect(),
267            1,
268            10,
269        ));
270
271        // Test put and get
272        cache.put(key.clone(), file_id, selection.clone());
273        let retrieved = cache.get(&key, file_id);
274        assert!(retrieved.is_some());
275        assert_eq!(
276            retrieved.unwrap().as_ref().row_count(),
277            selection.as_ref().row_count()
278        );
279
280        // Test get non-existent key
281        let non_existent_file_id = FileId::random();
282        assert!(cache.get(&key, non_existent_file_id).is_none());
283    }
284
285    #[test]
286    fn test_cache_capacity_limit() {
287        // Create a cache with small capacity (100 bytes)
288        let cache = IndexResultCache::new(100);
289        let file_id1 = FileId::random();
290        let file_id2 = FileId::random();
291
292        // Create two large keys that will exceed capacity
293        let mut predicates1 = BTreeMap::new();
294        let request1 = FulltextRequest {
295            queries: vec![
296                FulltextQuery(
297                    "test query 1 with a very long string to ensure large weight".to_string(),
298                ),
299                FulltextQuery("another long query string".to_string()),
300            ],
301            terms: vec![],
302        };
303        predicates1.insert(1, request1);
304        let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
305        let selection1 = Arc::new(RowGroupSelection::default());
306
307        let mut predicates2 = BTreeMap::new();
308        let request2 = FulltextRequest {
309            queries: vec![
310                FulltextQuery(
311                    "test query 2 with a very long string to ensure large weight".to_string(),
312                ),
313                FulltextQuery("another long query string".to_string()),
314            ],
315            terms: vec![],
316        };
317        predicates2.insert(1, request2);
318        let key2 = PredicateKey::new_fulltext(Arc::new(predicates2));
319        let selection2 = Arc::new(RowGroupSelection::default());
320
321        // Calculate weights
322        let weight1 =
323            IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1);
324        let weight2 =
325            IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2);
326        assert!(weight1 > 100);
327        assert!(weight2 > 100);
328
329        // Put first key-value pair
330        cache.put(key1.clone(), file_id1, selection1.clone());
331
332        // Verify first key is in cache
333        let retrieved1 = cache.get(&key1, file_id1);
334        assert!(retrieved1.is_some());
335        assert_eq!(
336            retrieved1.unwrap().as_ref().row_count(),
337            selection1.as_ref().row_count()
338        );
339
340        // Put second key-value pair, which should trigger eviction
341        cache.put(key2.clone(), file_id2, selection2.clone());
342
343        // Verify second key is in cache
344        let retrieved2 = cache.get(&key2, file_id2);
345        assert!(retrieved2.is_some());
346        assert_eq!(
347            retrieved2.unwrap().as_ref().row_count(),
348            selection2.as_ref().row_count()
349        );
350
351        // Verify first key was evicted
352        cache.cache.run_pending_tasks();
353        let retrieved1_after_eviction = cache.get(&key1, file_id1);
354        assert!(
355            retrieved1_after_eviction.is_none(),
356            "First key should have been evicted"
357        );
358    }
359
360    #[test]
361    fn test_index_result_cache_weight() {
362        let file_id = FileId::random();
363
364        // Test empty values
365        let empty_predicates = BTreeMap::new();
366        let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates));
367        let empty_selection = Arc::new(RowGroupSelection::default());
368        let empty_weight = IndexResultCache::index_result_cache_weight(
369            &(empty_key.clone(), file_id),
370            &empty_selection,
371        );
372        assert_eq!(empty_weight, 0);
373        assert_eq!(
374            empty_weight,
375            empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32
376        );
377
378        // Test 1: FulltextIndexKey
379        let mut predicates1 = BTreeMap::new();
380        let request1 = FulltextRequest {
381            queries: vec![FulltextQuery("test query".to_string())],
382            terms: vec![FulltextTerm {
383                col_lowered: false,
384                term: "test term".to_string(),
385            }],
386        };
387        predicates1.insert(1, request1);
388        let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
389        let selection1 = Arc::new(RowGroupSelection::new(100, 250));
390        let weight1 =
391            IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1);
392        assert!(weight1 > 0);
393        assert_eq!(
394            weight1,
395            key1.mem_usage() as u32 + selection1.mem_usage() as u32
396        );
397
398        // Test 2: BloomFilterKey
399        let mut predicates2 = BTreeMap::new();
400        let predicate2 = BloomInListPredicate {
401            list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]),
402        };
403        predicates2.insert(1, vec![predicate2]);
404        let key2 = PredicateKey::new_bloom(Arc::new(predicates2));
405        let selection2 = Arc::new(RowGroupSelection::from_row_ids(
406            [1, 2, 3].into_iter().collect(),
407            100,
408            1,
409        ));
410        let weight2 =
411            IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2);
412        assert!(weight2 > 0);
413        assert_eq!(
414            weight2,
415            key2.mem_usage() as u32 + selection2.mem_usage() as u32
416        );
417
418        // Test 3: InvertedIndexKey
419        let mut predicates3 = BTreeMap::new();
420        let predicate3 = Predicate::Range(RangePredicate {
421            range: Range {
422                lower: None,
423                upper: None,
424            },
425        });
426        predicates3.insert(1, vec![predicate3]);
427        let key3 = PredicateKey::new_inverted(Arc::new(predicates3));
428        let selection3 = Arc::new(RowGroupSelection::from_row_ranges(
429            vec![(0, vec![5..15])],
430            20,
431        ));
432        let weight3 =
433            IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3);
434        assert!(weight3 > 0);
435        assert_eq!(
436            weight3,
437            key3.mem_usage() as u32 + selection3.mem_usage() as u32
438        );
439    }
440}