mito2/cache/index/
result_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use index::bloom_filter::applier::InListPredicate;
19use index::inverted_index::search::predicate::{Predicate, RangePredicate};
20use moka::notification::RemovalCause;
21use moka::sync::Cache;
22use store_api::storage::ColumnId;
23
24use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
25use crate::sst::file::FileId;
26use crate::sst::index::fulltext_index::applier::builder::{
27    FulltextQuery, FulltextRequest, FulltextTerm,
28};
29use crate::sst::parquet::row_selection::RowGroupSelection;
30
31const INDEX_RESULT_TYPE: &str = "index_result";
32
33/// Cache for storing index query results.
34///
35/// The `RowGroupSelection` is a collection of row groups that match the predicate.
36///
37/// Row groups can be partially searched. Row groups that not contained in `RowGroupSelection` are not searched.
38/// User can retrieve the partial results and handle uncontained row groups required by the predicate subsequently.
39pub struct IndexResultCache {
40    cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
41}
42
43impl IndexResultCache {
44    /// Creates a new cache with the given capacity.
45    pub fn new(capacity: u64) -> Self {
46        fn to_str(cause: RemovalCause) -> &'static str {
47            match cause {
48                RemovalCause::Expired => "expired",
49                RemovalCause::Explicit => "explicit",
50                RemovalCause::Replaced => "replaced",
51                RemovalCause::Size => "size",
52            }
53        }
54
55        let cache = Cache::builder()
56            .max_capacity(capacity)
57            .weigher(Self::index_result_cache_weight)
58            .eviction_listener(|k, v, cause| {
59                let size = Self::index_result_cache_weight(&k, &v);
60                CACHE_BYTES
61                    .with_label_values(&[INDEX_RESULT_TYPE])
62                    .sub(size.into());
63                CACHE_EVICTION
64                    .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
65                    .inc();
66            })
67            .build();
68        Self { cache }
69    }
70
71    /// Puts a query result into the cache.
72    ///
73    /// Allow user to put a partial result (not containing all row groups) into the cache.
74    pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
75        let key = (key, file_id);
76        let size = Self::index_result_cache_weight(&key, &result);
77        CACHE_BYTES
78            .with_label_values(&[INDEX_RESULT_TYPE])
79            .add(size.into());
80        self.cache.insert(key, result);
81    }
82
83    /// Gets a query result from the cache.
84    ///
85    /// Note: the returned `RowGroupSelection` only contains the row groups that are searched.
86    ///       Caller should handle the uncontained row groups required by the predicate subsequently.
87    pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
88        let res = self.cache.get(&(key.clone(), file_id));
89        if res.is_some() {
90            CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc();
91        } else {
92            CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc()
93        }
94        res
95    }
96
97    /// Calculates the memory usage of a cache entry.
98    fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
99        k.0.mem_usage() as u32 + v.mem_usage() as u32
100    }
101}
102
103/// Key for different types of index predicates.
104#[derive(Debug, Clone, PartialEq, Eq, Hash)]
105pub enum PredicateKey {
106    /// Fulltext index predicate.
107    Fulltext(FulltextIndexKey),
108    /// Bloom filter predicate.
109    Bloom(BloomFilterKey),
110    /// Inverted index predicate.
111    Inverted(InvertedIndexKey),
112}
113
114impl PredicateKey {
115    /// Creates a new fulltext index key.
116    pub fn new_fulltext(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
117        Self::Fulltext(FulltextIndexKey::new(predicates))
118    }
119
120    /// Creates a new bloom filter key.
121    pub fn new_bloom(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
122        Self::Bloom(BloomFilterKey::new(predicates))
123    }
124
125    /// Creates a new inverted index key.
126    pub fn new_inverted(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
127        Self::Inverted(InvertedIndexKey::new(predicates))
128    }
129
130    /// Returns the memory usage of this key.
131    pub fn mem_usage(&self) -> usize {
132        match self {
133            Self::Fulltext(key) => key.mem_usage,
134            Self::Bloom(key) => key.mem_usage,
135            Self::Inverted(key) => key.mem_usage,
136        }
137    }
138}
139
140/// Key for fulltext index queries.
141#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
142pub struct FulltextIndexKey {
143    predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>,
144    mem_usage: usize,
145}
146
147impl FulltextIndexKey {
148    /// Creates a new fulltext index key with the given predicates.
149    /// Calculates memory usage based on the size of queries and terms.
150    pub fn new(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
151        let mem_usage = predicates
152            .values()
153            .map(|request| {
154                let query_size = request
155                    .queries
156                    .iter()
157                    .map(|query| query.0.len() + size_of::<FulltextQuery>())
158                    .sum::<usize>();
159                let term_size = request
160                    .terms
161                    .iter()
162                    .map(|term| term.term.len() + size_of::<FulltextTerm>())
163                    .sum::<usize>();
164                query_size + term_size
165            })
166            .sum();
167        Self {
168            predicates,
169            mem_usage,
170        }
171    }
172}
173
174/// Key for bloom filter queries.
175#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
176pub struct BloomFilterKey {
177    predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
178    mem_usage: usize,
179}
180
181impl BloomFilterKey {
182    /// Creates a new bloom filter key with the given predicates.
183    /// Calculates memory usage based on the size of predicate lists.
184    pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
185        let mem_usage = predicates
186            .values()
187            .map(|predicates| {
188                predicates
189                    .iter()
190                    .map(|predicate| predicate.list.iter().map(|list| list.len()).sum::<usize>())
191                    .sum::<usize>()
192            })
193            .sum();
194        Self {
195            predicates,
196            mem_usage,
197        }
198    }
199}
200
201/// Key for inverted index queries.
202#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
203pub struct InvertedIndexKey {
204    predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>,
205    mem_usage: usize,
206}
207
208impl InvertedIndexKey {
209    /// Creates a new inverted index key with the given predicates.
210    /// Calculates memory usage based on the type and size of predicates.
211    pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
212        let mem_usage = predicates
213            .values()
214            .map(|predicates| {
215                predicates
216                    .iter()
217                    .map(|predicate| match predicate {
218                        Predicate::InList(predicate) => {
219                            predicate.list.iter().map(|list| list.len()).sum::<usize>()
220                        }
221                        Predicate::Range(_) => size_of::<RangePredicate>(),
222                        Predicate::RegexMatch(predicate) => predicate.pattern.len(),
223                    })
224                    .sum::<usize>()
225            })
226            .sum();
227
228        Self {
229            predicates,
230            mem_usage,
231        }
232    }
233}
234
235#[cfg(test)]
236#[allow(clippy::single_range_in_vec_init)]
237mod tests {
238    use std::collections::{BTreeMap, BTreeSet};
239    use std::sync::Arc;
240
241    use index::bloom_filter::applier::InListPredicate as BloomInListPredicate;
242    use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate};
243
244    use super::*;
245    use crate::sst::index::fulltext_index::applier::builder::{
246        FulltextQuery, FulltextRequest, FulltextTerm,
247    };
248    use crate::sst::parquet::row_selection::RowGroupSelection;
249
250    #[test]
251    fn test_cache_basic_operations() {
252        let cache = IndexResultCache::new(1000);
253        let file_id = FileId::random();
254
255        // Create a test key and value
256        let predicates = BTreeMap::new();
257        let key = PredicateKey::new_fulltext(Arc::new(predicates));
258        let selection = Arc::new(RowGroupSelection::from_row_ids(
259            [1, 2, 3].into_iter().collect(),
260            1,
261            10,
262        ));
263
264        // Test put and get
265        cache.put(key.clone(), file_id, selection.clone());
266        let retrieved = cache.get(&key, file_id);
267        assert!(retrieved.is_some());
268        assert_eq!(
269            retrieved.unwrap().as_ref().row_count(),
270            selection.as_ref().row_count()
271        );
272
273        // Test get non-existent key
274        let non_existent_file_id = FileId::random();
275        assert!(cache.get(&key, non_existent_file_id).is_none());
276    }
277
278    #[test]
279    fn test_cache_capacity_limit() {
280        // Create a cache with small capacity (100 bytes)
281        let cache = IndexResultCache::new(100);
282        let file_id1 = FileId::random();
283        let file_id2 = FileId::random();
284
285        // Create two large keys that will exceed capacity
286        let mut predicates1 = BTreeMap::new();
287        let request1 = FulltextRequest {
288            queries: vec![
289                FulltextQuery(
290                    "test query 1 with a very long string to ensure large weight".to_string(),
291                ),
292                FulltextQuery("another long query string".to_string()),
293            ],
294            terms: vec![],
295        };
296        predicates1.insert(1, request1);
297        let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
298        let selection1 = Arc::new(RowGroupSelection::default());
299
300        let mut predicates2 = BTreeMap::new();
301        let request2 = FulltextRequest {
302            queries: vec![
303                FulltextQuery(
304                    "test query 2 with a very long string to ensure large weight".to_string(),
305                ),
306                FulltextQuery("another long query string".to_string()),
307            ],
308            terms: vec![],
309        };
310        predicates2.insert(1, request2);
311        let key2 = PredicateKey::new_fulltext(Arc::new(predicates2));
312        let selection2 = Arc::new(RowGroupSelection::default());
313
314        // Calculate weights
315        let weight1 =
316            IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1);
317        let weight2 =
318            IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2);
319        assert!(weight1 > 100);
320        assert!(weight2 > 100);
321
322        // Put first key-value pair
323        cache.put(key1.clone(), file_id1, selection1.clone());
324
325        // Verify first key is in cache
326        let retrieved1 = cache.get(&key1, file_id1);
327        assert!(retrieved1.is_some());
328        assert_eq!(
329            retrieved1.unwrap().as_ref().row_count(),
330            selection1.as_ref().row_count()
331        );
332
333        // Put second key-value pair, which should trigger eviction
334        cache.put(key2.clone(), file_id2, selection2.clone());
335
336        // Verify second key is in cache
337        let retrieved2 = cache.get(&key2, file_id2);
338        assert!(retrieved2.is_some());
339        assert_eq!(
340            retrieved2.unwrap().as_ref().row_count(),
341            selection2.as_ref().row_count()
342        );
343
344        // Verify first key was evicted
345        cache.cache.run_pending_tasks();
346        let retrieved1_after_eviction = cache.get(&key1, file_id1);
347        assert!(
348            retrieved1_after_eviction.is_none(),
349            "First key should have been evicted"
350        );
351    }
352
353    #[test]
354    fn test_index_result_cache_weight() {
355        let file_id = FileId::random();
356
357        // Test empty values
358        let empty_predicates = BTreeMap::new();
359        let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates));
360        let empty_selection = Arc::new(RowGroupSelection::default());
361        let empty_weight = IndexResultCache::index_result_cache_weight(
362            &(empty_key.clone(), file_id),
363            &empty_selection,
364        );
365        assert_eq!(empty_weight, 0);
366        assert_eq!(
367            empty_weight,
368            empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32
369        );
370
371        // Test 1: FulltextIndexKey
372        let mut predicates1 = BTreeMap::new();
373        let request1 = FulltextRequest {
374            queries: vec![FulltextQuery("test query".to_string())],
375            terms: vec![FulltextTerm {
376                col_lowered: false,
377                term: "test term".to_string(),
378            }],
379        };
380        predicates1.insert(1, request1);
381        let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
382        let selection1 = Arc::new(RowGroupSelection::new(100, 250));
383        let weight1 =
384            IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1);
385        assert!(weight1 > 0);
386        assert_eq!(
387            weight1,
388            key1.mem_usage() as u32 + selection1.mem_usage() as u32
389        );
390
391        // Test 2: BloomFilterKey
392        let mut predicates2 = BTreeMap::new();
393        let predicate2 = BloomInListPredicate {
394            list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]),
395        };
396        predicates2.insert(1, vec![predicate2]);
397        let key2 = PredicateKey::new_bloom(Arc::new(predicates2));
398        let selection2 = Arc::new(RowGroupSelection::from_row_ids(
399            [1, 2, 3].into_iter().collect(),
400            100,
401            1,
402        ));
403        let weight2 =
404            IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2);
405        assert!(weight2 > 0);
406        assert_eq!(
407            weight2,
408            key2.mem_usage() as u32 + selection2.mem_usage() as u32
409        );
410
411        // Test 3: InvertedIndexKey
412        let mut predicates3 = BTreeMap::new();
413        let predicate3 = Predicate::Range(RangePredicate {
414            range: Range {
415                lower: None,
416                upper: None,
417            },
418        });
419        predicates3.insert(1, vec![predicate3]);
420        let key3 = PredicateKey::new_inverted(Arc::new(predicates3));
421        let selection3 = Arc::new(RowGroupSelection::from_row_ranges(
422            vec![(0, vec![5..15])],
423            20,
424        ));
425        let weight3 =
426            IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3);
427        assert!(weight3 > 0);
428        assert_eq!(
429            weight3,
430            key3.mem_usage() as u32 + selection3.mem_usage() as u32
431        );
432    }
433}