mito2/cache/index/
result_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use index::bloom_filter::applier::InListPredicate;
19use index::inverted_index::search::predicate::{Predicate, RangePredicate};
20use moka::notification::RemovalCause;
21use moka::sync::Cache;
22use store_api::storage::{ColumnId, FileId};
23
24use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
25use crate::sst::index::fulltext_index::applier::builder::{
26    FulltextQuery, FulltextRequest, FulltextTerm,
27};
28use crate::sst::parquet::row_selection::RowGroupSelection;
29
30const INDEX_RESULT_TYPE: &str = "index_result";
31
32/// Cache for storing index query results.
33///
34/// The `RowGroupSelection` is a collection of row groups that match the predicate.
35///
36/// Row groups can be partially searched. Row groups that not contained in `RowGroupSelection` are not searched.
37/// User can retrieve the partial results and handle uncontained row groups required by the predicate subsequently.
38pub struct IndexResultCache {
39    cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
40}
41
42impl IndexResultCache {
43    /// Creates a new cache with the given capacity.
44    pub fn new(capacity: u64) -> Self {
45        fn to_str(cause: RemovalCause) -> &'static str {
46            match cause {
47                RemovalCause::Expired => "expired",
48                RemovalCause::Explicit => "explicit",
49                RemovalCause::Replaced => "replaced",
50                RemovalCause::Size => "size",
51            }
52        }
53
54        let cache = Cache::builder()
55            .max_capacity(capacity)
56            .weigher(Self::index_result_cache_weight)
57            .eviction_listener(|k, v, cause| {
58                let size = Self::index_result_cache_weight(&k, &v);
59                CACHE_BYTES
60                    .with_label_values(&[INDEX_RESULT_TYPE])
61                    .sub(size.into());
62                CACHE_EVICTION
63                    .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
64                    .inc();
65            })
66            .support_invalidation_closures()
67            .build();
68        Self { cache }
69    }
70
71    /// Puts a query result into the cache.
72    ///
73    /// Allow user to put a partial result (not containing all row groups) into the cache.
74    pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
75        let key = (key, file_id);
76        let size = Self::index_result_cache_weight(&key, &result);
77        CACHE_BYTES
78            .with_label_values(&[INDEX_RESULT_TYPE])
79            .add(size.into());
80        self.cache.insert(key, result);
81    }
82
83    /// Gets a query result from the cache.
84    ///
85    /// Note: the returned `RowGroupSelection` only contains the row groups that are searched.
86    ///       Caller should handle the uncontained row groups required by the predicate subsequently.
87    pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
88        let res = self.cache.get(&(key.clone(), file_id));
89        if res.is_some() {
90            CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc();
91        } else {
92            CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc()
93        }
94        res
95    }
96
97    /// Calculates the memory usage of a cache entry.
98    fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
99        k.0.mem_usage() as u32 + v.mem_usage() as u32
100    }
101
102    /// Removes cached results for the given file.
103    pub fn invalidate_file(&self, file_id: FileId) {
104        self.cache
105            .invalidate_entries_if(move |(_, cached_file_id), _| *cached_file_id == file_id)
106            .expect("cache should support invalidation closures");
107    }
108}
109
110/// Key for different types of index predicates.
111#[derive(Debug, Clone, PartialEq, Eq, Hash)]
112pub enum PredicateKey {
113    /// Fulltext index predicate.
114    Fulltext(FulltextIndexKey),
115    /// Bloom filter predicate.
116    Bloom(BloomFilterKey),
117    /// Inverted index predicate.
118    Inverted(InvertedIndexKey),
119    /// Min-max pruning predicate.
120    MinMax(MinMaxKey),
121}
122
123impl PredicateKey {
124    /// Creates a new fulltext index key.
125    pub fn new_fulltext(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
126        Self::Fulltext(FulltextIndexKey::new(predicates))
127    }
128
129    /// Creates a new bloom filter key.
130    pub fn new_bloom(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
131        Self::Bloom(BloomFilterKey::new(predicates))
132    }
133
134    /// Creates a new inverted index key.
135    pub fn new_inverted(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
136        Self::Inverted(InvertedIndexKey::new(predicates))
137    }
138
139    /// Creates a new min-max pruning key.
140    pub fn new_minmax(exprs: Arc<Vec<String>>, schema_version: u64, skip_fields: bool) -> Self {
141        Self::MinMax(MinMaxKey::new(exprs, schema_version, skip_fields))
142    }
143
144    /// Returns the memory usage of this key.
145    pub fn mem_usage(&self) -> usize {
146        match self {
147            Self::Fulltext(key) => key.mem_usage,
148            Self::Bloom(key) => key.mem_usage,
149            Self::Inverted(key) => key.mem_usage,
150            Self::MinMax(key) => key.mem_usage,
151        }
152    }
153}
154
155/// Key for fulltext index queries.
156#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
157pub struct FulltextIndexKey {
158    predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>,
159    mem_usage: usize,
160}
161
162impl FulltextIndexKey {
163    /// Creates a new fulltext index key with the given predicates.
164    /// Calculates memory usage based on the size of queries and terms.
165    pub fn new(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
166        let mem_usage = predicates
167            .values()
168            .map(|request| {
169                let query_size = request
170                    .queries
171                    .iter()
172                    .map(|query| query.0.len() + size_of::<FulltextQuery>())
173                    .sum::<usize>();
174                let term_size = request
175                    .terms
176                    .iter()
177                    .map(|term| term.term.len() + size_of::<FulltextTerm>())
178                    .sum::<usize>();
179                query_size + term_size
180            })
181            .sum();
182        Self {
183            predicates,
184            mem_usage,
185        }
186    }
187}
188
189/// Key for bloom filter queries.
190#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
191pub struct BloomFilterKey {
192    predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
193    mem_usage: usize,
194}
195
196impl BloomFilterKey {
197    /// Creates a new bloom filter key with the given predicates.
198    /// Calculates memory usage based on the size of predicate lists.
199    pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
200        let mem_usage = predicates
201            .values()
202            .map(|predicates| {
203                predicates
204                    .iter()
205                    .map(|predicate| predicate.list.iter().map(|list| list.len()).sum::<usize>())
206                    .sum::<usize>()
207            })
208            .sum();
209        Self {
210            predicates,
211            mem_usage,
212        }
213    }
214}
215
216/// Key for inverted index queries.
217#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
218pub struct InvertedIndexKey {
219    predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>,
220    mem_usage: usize,
221}
222
223impl InvertedIndexKey {
224    /// Creates a new inverted index key with the given predicates.
225    /// Calculates memory usage based on the type and size of predicates.
226    pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
227        let mem_usage = predicates
228            .values()
229            .map(|predicates| {
230                predicates
231                    .iter()
232                    .map(|predicate| match predicate {
233                        Predicate::InList(predicate) => {
234                            predicate.list.iter().map(|list| list.len()).sum::<usize>()
235                        }
236                        Predicate::Range(_) => size_of::<RangePredicate>(),
237                        Predicate::RegexMatch(predicate) => predicate.pattern.len(),
238                    })
239                    .sum::<usize>()
240            })
241            .sum();
242
243        Self {
244            predicates,
245            mem_usage,
246        }
247    }
248}
249
250/// Key for min-max pruning.
251#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
252pub struct MinMaxKey {
253    exprs: Arc<Vec<String>>,
254    schema_version: u64,
255    skip_fields: bool,
256    mem_usage: usize,
257}
258
259impl MinMaxKey {
260    pub fn new(exprs: Arc<Vec<String>>, schema_version: u64, skip_fields: bool) -> Self {
261        let mem_usage = size_of::<Self>()
262            + size_of::<Vec<String>>()
263            + exprs.len() * size_of::<String>()
264            + exprs.iter().map(|s| s.len()).sum::<usize>();
265        Self {
266            exprs,
267            schema_version,
268            skip_fields,
269            mem_usage,
270        }
271    }
272}
273
274#[cfg(test)]
275#[allow(clippy::single_range_in_vec_init)]
276mod tests {
277    use std::collections::{BTreeMap, BTreeSet};
278    use std::sync::Arc;
279
280    use index::bloom_filter::applier::InListPredicate as BloomInListPredicate;
281    use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate};
282
283    use super::*;
284    use crate::sst::index::fulltext_index::applier::builder::{
285        FulltextQuery, FulltextRequest, FulltextTerm,
286    };
287    use crate::sst::parquet::row_selection::RowGroupSelection;
288
289    #[test]
290    fn test_cache_basic_operations() {
291        let cache = IndexResultCache::new(1000);
292        let file_id = FileId::random();
293
294        // Create a test key and value
295        let predicates = BTreeMap::new();
296        let key = PredicateKey::new_fulltext(Arc::new(predicates));
297        let selection = Arc::new(RowGroupSelection::from_row_ids(
298            [1, 2, 3].into_iter().collect(),
299            1,
300            10,
301        ));
302
303        // Test put and get
304        cache.put(key.clone(), file_id, selection.clone());
305        let retrieved = cache.get(&key, file_id);
306        assert!(retrieved.is_some());
307        assert_eq!(
308            retrieved.unwrap().as_ref().row_count(),
309            selection.as_ref().row_count()
310        );
311
312        // Test get non-existent key
313        let non_existent_file_id = FileId::random();
314        assert!(cache.get(&key, non_existent_file_id).is_none());
315    }
316
317    #[test]
318    fn test_minmax_key_should_distinguish_schema_version_and_skip_fields() {
319        let exprs = Arc::new(vec!["col > 1".to_string()]);
320
321        let key1 = PredicateKey::new_minmax(exprs.clone(), 1, false);
322        let key2 = PredicateKey::new_minmax(exprs.clone(), 2, false);
323        assert_ne!(key1, key2);
324
325        let key3 = PredicateKey::new_minmax(exprs, 1, true);
326        assert_ne!(key1, key3);
327    }
328
329    #[test]
330    fn test_cache_capacity_limit() {
331        // Create a cache with small capacity (100 bytes)
332        let cache = IndexResultCache::new(100);
333        let file_id1 = FileId::random();
334        let file_id2 = FileId::random();
335
336        // Create two large keys that will exceed capacity
337        let mut predicates1 = BTreeMap::new();
338        let request1 = FulltextRequest {
339            queries: vec![
340                FulltextQuery(
341                    "test query 1 with a very long string to ensure large weight".to_string(),
342                ),
343                FulltextQuery("another long query string".to_string()),
344            ],
345            terms: vec![],
346        };
347        predicates1.insert(1, request1);
348        let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
349        let selection1 = Arc::new(RowGroupSelection::default());
350
351        let mut predicates2 = BTreeMap::new();
352        let request2 = FulltextRequest {
353            queries: vec![
354                FulltextQuery(
355                    "test query 2 with a very long string to ensure large weight".to_string(),
356                ),
357                FulltextQuery("another long query string".to_string()),
358            ],
359            terms: vec![],
360        };
361        predicates2.insert(1, request2);
362        let key2 = PredicateKey::new_fulltext(Arc::new(predicates2));
363        let selection2 = Arc::new(RowGroupSelection::default());
364
365        // Calculate weights
366        let weight1 =
367            IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1);
368        let weight2 =
369            IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2);
370        assert!(weight1 > 100);
371        assert!(weight2 > 100);
372
373        // Put first key-value pair
374        cache.put(key1.clone(), file_id1, selection1.clone());
375
376        // Verify first key is in cache
377        let retrieved1 = cache.get(&key1, file_id1);
378        assert!(retrieved1.is_some());
379        assert_eq!(
380            retrieved1.unwrap().as_ref().row_count(),
381            selection1.as_ref().row_count()
382        );
383
384        // Put second key-value pair, which should trigger eviction
385        cache.put(key2.clone(), file_id2, selection2.clone());
386
387        // Verify second key is in cache
388        let retrieved2 = cache.get(&key2, file_id2);
389        assert!(retrieved2.is_some());
390        assert_eq!(
391            retrieved2.unwrap().as_ref().row_count(),
392            selection2.as_ref().row_count()
393        );
394
395        // Verify first key was evicted
396        cache.cache.run_pending_tasks();
397        let retrieved1_after_eviction = cache.get(&key1, file_id1);
398        assert!(
399            retrieved1_after_eviction.is_none(),
400            "First key should have been evicted"
401        );
402    }
403
404    #[test]
405    fn test_index_result_cache_weight() {
406        let file_id = FileId::random();
407
408        // Test empty values
409        let empty_predicates = BTreeMap::new();
410        let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates));
411        let empty_selection = Arc::new(RowGroupSelection::default());
412        let empty_weight = IndexResultCache::index_result_cache_weight(
413            &(empty_key.clone(), file_id),
414            &empty_selection,
415        );
416        assert_eq!(empty_weight, 0);
417        assert_eq!(
418            empty_weight,
419            empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32
420        );
421
422        // Test 1: FulltextIndexKey
423        let mut predicates1 = BTreeMap::new();
424        let request1 = FulltextRequest {
425            queries: vec![FulltextQuery("test query".to_string())],
426            terms: vec![FulltextTerm {
427                col_lowered: false,
428                term: "test term".to_string(),
429            }],
430        };
431        predicates1.insert(1, request1);
432        let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
433        let selection1 = Arc::new(RowGroupSelection::new(100, 250));
434        let weight1 =
435            IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1);
436        assert!(weight1 > 0);
437        assert_eq!(
438            weight1,
439            key1.mem_usage() as u32 + selection1.mem_usage() as u32
440        );
441
442        // Test 2: BloomFilterKey
443        let mut predicates2 = BTreeMap::new();
444        let predicate2 = BloomInListPredicate {
445            list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]),
446        };
447        predicates2.insert(1, vec![predicate2]);
448        let key2 = PredicateKey::new_bloom(Arc::new(predicates2));
449        let selection2 = Arc::new(RowGroupSelection::from_row_ids(
450            [1, 2, 3].into_iter().collect(),
451            100,
452            1,
453        ));
454        let weight2 =
455            IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2);
456        assert!(weight2 > 0);
457        assert_eq!(
458            weight2,
459            key2.mem_usage() as u32 + selection2.mem_usage() as u32
460        );
461
462        // Test 3: InvertedIndexKey
463        let mut predicates3 = BTreeMap::new();
464        let predicate3 = Predicate::Range(RangePredicate {
465            range: Range {
466                lower: None,
467                upper: None,
468            },
469        });
470        predicates3.insert(1, vec![predicate3]);
471        let key3 = PredicateKey::new_inverted(Arc::new(predicates3));
472        let selection3 = Arc::new(RowGroupSelection::from_row_ranges(
473            vec![(0, vec![5..15])],
474            20,
475        ));
476        let weight3 =
477            IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3);
478        assert!(weight3 > 0);
479        assert_eq!(
480            weight3,
481            key3.mem_usage() as u32 + selection3.mem_usage() as u32
482        );
483    }
484}