mito2/cache/index/
result_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use index::bloom_filter::applier::InListPredicate;
19use index::inverted_index::search::predicate::{Predicate, RangePredicate};
20use moka::notification::RemovalCause;
21use moka::sync::Cache;
22use store_api::storage::{ColumnId, FileId};
23
24use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
25use crate::sst::index::fulltext_index::applier::builder::{
26    FulltextQuery, FulltextRequest, FulltextTerm,
27};
28use crate::sst::parquet::row_selection::RowGroupSelection;
29
30const INDEX_RESULT_TYPE: &str = "index_result";
31
32/// Cache for storing index query results.
33///
34/// The `RowGroupSelection` is a collection of row groups that match the predicate.
35///
36/// Row groups can be partially searched. Row groups that not contained in `RowGroupSelection` are not searched.
37/// User can retrieve the partial results and handle uncontained row groups required by the predicate subsequently.
38pub struct IndexResultCache {
39    cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
40}
41
42impl IndexResultCache {
43    /// Creates a new cache with the given capacity.
44    pub fn new(capacity: u64) -> Self {
45        fn to_str(cause: RemovalCause) -> &'static str {
46            match cause {
47                RemovalCause::Expired => "expired",
48                RemovalCause::Explicit => "explicit",
49                RemovalCause::Replaced => "replaced",
50                RemovalCause::Size => "size",
51            }
52        }
53
54        let cache = Cache::builder()
55            .max_capacity(capacity)
56            .weigher(Self::index_result_cache_weight)
57            .eviction_listener(|k, v, cause| {
58                let size = Self::index_result_cache_weight(&k, &v);
59                CACHE_BYTES
60                    .with_label_values(&[INDEX_RESULT_TYPE])
61                    .sub(size.into());
62                CACHE_EVICTION
63                    .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
64                    .inc();
65            })
66            .build();
67        Self { cache }
68    }
69
70    /// Puts a query result into the cache.
71    ///
72    /// Allow user to put a partial result (not containing all row groups) into the cache.
73    pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
74        let key = (key, file_id);
75        let size = Self::index_result_cache_weight(&key, &result);
76        CACHE_BYTES
77            .with_label_values(&[INDEX_RESULT_TYPE])
78            .add(size.into());
79        self.cache.insert(key, result);
80    }
81
82    /// Gets a query result from the cache.
83    ///
84    /// Note: the returned `RowGroupSelection` only contains the row groups that are searched.
85    ///       Caller should handle the uncontained row groups required by the predicate subsequently.
86    pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
87        let res = self.cache.get(&(key.clone(), file_id));
88        if res.is_some() {
89            CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc();
90        } else {
91            CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc()
92        }
93        res
94    }
95
96    /// Calculates the memory usage of a cache entry.
97    fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
98        k.0.mem_usage() as u32 + v.mem_usage() as u32
99    }
100}
101
102/// Key for different types of index predicates.
103#[derive(Debug, Clone, PartialEq, Eq, Hash)]
104pub enum PredicateKey {
105    /// Fulltext index predicate.
106    Fulltext(FulltextIndexKey),
107    /// Bloom filter predicate.
108    Bloom(BloomFilterKey),
109    /// Inverted index predicate.
110    Inverted(InvertedIndexKey),
111}
112
113impl PredicateKey {
114    /// Creates a new fulltext index key.
115    pub fn new_fulltext(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
116        Self::Fulltext(FulltextIndexKey::new(predicates))
117    }
118
119    /// Creates a new bloom filter key.
120    pub fn new_bloom(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
121        Self::Bloom(BloomFilterKey::new(predicates))
122    }
123
124    /// Creates a new inverted index key.
125    pub fn new_inverted(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
126        Self::Inverted(InvertedIndexKey::new(predicates))
127    }
128
129    /// Returns the memory usage of this key.
130    pub fn mem_usage(&self) -> usize {
131        match self {
132            Self::Fulltext(key) => key.mem_usage,
133            Self::Bloom(key) => key.mem_usage,
134            Self::Inverted(key) => key.mem_usage,
135        }
136    }
137}
138
139/// Key for fulltext index queries.
140#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
141pub struct FulltextIndexKey {
142    predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>,
143    mem_usage: usize,
144}
145
146impl FulltextIndexKey {
147    /// Creates a new fulltext index key with the given predicates.
148    /// Calculates memory usage based on the size of queries and terms.
149    pub fn new(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
150        let mem_usage = predicates
151            .values()
152            .map(|request| {
153                let query_size = request
154                    .queries
155                    .iter()
156                    .map(|query| query.0.len() + size_of::<FulltextQuery>())
157                    .sum::<usize>();
158                let term_size = request
159                    .terms
160                    .iter()
161                    .map(|term| term.term.len() + size_of::<FulltextTerm>())
162                    .sum::<usize>();
163                query_size + term_size
164            })
165            .sum();
166        Self {
167            predicates,
168            mem_usage,
169        }
170    }
171}
172
173/// Key for bloom filter queries.
174#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
175pub struct BloomFilterKey {
176    predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
177    mem_usage: usize,
178}
179
180impl BloomFilterKey {
181    /// Creates a new bloom filter key with the given predicates.
182    /// Calculates memory usage based on the size of predicate lists.
183    pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
184        let mem_usage = predicates
185            .values()
186            .map(|predicates| {
187                predicates
188                    .iter()
189                    .map(|predicate| predicate.list.iter().map(|list| list.len()).sum::<usize>())
190                    .sum::<usize>()
191            })
192            .sum();
193        Self {
194            predicates,
195            mem_usage,
196        }
197    }
198}
199
200/// Key for inverted index queries.
201#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
202pub struct InvertedIndexKey {
203    predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>,
204    mem_usage: usize,
205}
206
207impl InvertedIndexKey {
208    /// Creates a new inverted index key with the given predicates.
209    /// Calculates memory usage based on the type and size of predicates.
210    pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
211        let mem_usage = predicates
212            .values()
213            .map(|predicates| {
214                predicates
215                    .iter()
216                    .map(|predicate| match predicate {
217                        Predicate::InList(predicate) => {
218                            predicate.list.iter().map(|list| list.len()).sum::<usize>()
219                        }
220                        Predicate::Range(_) => size_of::<RangePredicate>(),
221                        Predicate::RegexMatch(predicate) => predicate.pattern.len(),
222                    })
223                    .sum::<usize>()
224            })
225            .sum();
226
227        Self {
228            predicates,
229            mem_usage,
230        }
231    }
232}
233
234#[cfg(test)]
235#[allow(clippy::single_range_in_vec_init)]
236mod tests {
237    use std::collections::{BTreeMap, BTreeSet};
238    use std::sync::Arc;
239
240    use index::bloom_filter::applier::InListPredicate as BloomInListPredicate;
241    use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate};
242
243    use super::*;
244    use crate::sst::index::fulltext_index::applier::builder::{
245        FulltextQuery, FulltextRequest, FulltextTerm,
246    };
247    use crate::sst::parquet::row_selection::RowGroupSelection;
248
249    #[test]
250    fn test_cache_basic_operations() {
251        let cache = IndexResultCache::new(1000);
252        let file_id = FileId::random();
253
254        // Create a test key and value
255        let predicates = BTreeMap::new();
256        let key = PredicateKey::new_fulltext(Arc::new(predicates));
257        let selection = Arc::new(RowGroupSelection::from_row_ids(
258            [1, 2, 3].into_iter().collect(),
259            1,
260            10,
261        ));
262
263        // Test put and get
264        cache.put(key.clone(), file_id, selection.clone());
265        let retrieved = cache.get(&key, file_id);
266        assert!(retrieved.is_some());
267        assert_eq!(
268            retrieved.unwrap().as_ref().row_count(),
269            selection.as_ref().row_count()
270        );
271
272        // Test get non-existent key
273        let non_existent_file_id = FileId::random();
274        assert!(cache.get(&key, non_existent_file_id).is_none());
275    }
276
277    #[test]
278    fn test_cache_capacity_limit() {
279        // Create a cache with small capacity (100 bytes)
280        let cache = IndexResultCache::new(100);
281        let file_id1 = FileId::random();
282        let file_id2 = FileId::random();
283
284        // Create two large keys that will exceed capacity
285        let mut predicates1 = BTreeMap::new();
286        let request1 = FulltextRequest {
287            queries: vec![
288                FulltextQuery(
289                    "test query 1 with a very long string to ensure large weight".to_string(),
290                ),
291                FulltextQuery("another long query string".to_string()),
292            ],
293            terms: vec![],
294        };
295        predicates1.insert(1, request1);
296        let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
297        let selection1 = Arc::new(RowGroupSelection::default());
298
299        let mut predicates2 = BTreeMap::new();
300        let request2 = FulltextRequest {
301            queries: vec![
302                FulltextQuery(
303                    "test query 2 with a very long string to ensure large weight".to_string(),
304                ),
305                FulltextQuery("another long query string".to_string()),
306            ],
307            terms: vec![],
308        };
309        predicates2.insert(1, request2);
310        let key2 = PredicateKey::new_fulltext(Arc::new(predicates2));
311        let selection2 = Arc::new(RowGroupSelection::default());
312
313        // Calculate weights
314        let weight1 =
315            IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1);
316        let weight2 =
317            IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2);
318        assert!(weight1 > 100);
319        assert!(weight2 > 100);
320
321        // Put first key-value pair
322        cache.put(key1.clone(), file_id1, selection1.clone());
323
324        // Verify first key is in cache
325        let retrieved1 = cache.get(&key1, file_id1);
326        assert!(retrieved1.is_some());
327        assert_eq!(
328            retrieved1.unwrap().as_ref().row_count(),
329            selection1.as_ref().row_count()
330        );
331
332        // Put second key-value pair, which should trigger eviction
333        cache.put(key2.clone(), file_id2, selection2.clone());
334
335        // Verify second key is in cache
336        let retrieved2 = cache.get(&key2, file_id2);
337        assert!(retrieved2.is_some());
338        assert_eq!(
339            retrieved2.unwrap().as_ref().row_count(),
340            selection2.as_ref().row_count()
341        );
342
343        // Verify first key was evicted
344        cache.cache.run_pending_tasks();
345        let retrieved1_after_eviction = cache.get(&key1, file_id1);
346        assert!(
347            retrieved1_after_eviction.is_none(),
348            "First key should have been evicted"
349        );
350    }
351
352    #[test]
353    fn test_index_result_cache_weight() {
354        let file_id = FileId::random();
355
356        // Test empty values
357        let empty_predicates = BTreeMap::new();
358        let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates));
359        let empty_selection = Arc::new(RowGroupSelection::default());
360        let empty_weight = IndexResultCache::index_result_cache_weight(
361            &(empty_key.clone(), file_id),
362            &empty_selection,
363        );
364        assert_eq!(empty_weight, 0);
365        assert_eq!(
366            empty_weight,
367            empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32
368        );
369
370        // Test 1: FulltextIndexKey
371        let mut predicates1 = BTreeMap::new();
372        let request1 = FulltextRequest {
373            queries: vec![FulltextQuery("test query".to_string())],
374            terms: vec![FulltextTerm {
375                col_lowered: false,
376                term: "test term".to_string(),
377            }],
378        };
379        predicates1.insert(1, request1);
380        let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
381        let selection1 = Arc::new(RowGroupSelection::new(100, 250));
382        let weight1 =
383            IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1);
384        assert!(weight1 > 0);
385        assert_eq!(
386            weight1,
387            key1.mem_usage() as u32 + selection1.mem_usage() as u32
388        );
389
390        // Test 2: BloomFilterKey
391        let mut predicates2 = BTreeMap::new();
392        let predicate2 = BloomInListPredicate {
393            list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]),
394        };
395        predicates2.insert(1, vec![predicate2]);
396        let key2 = PredicateKey::new_bloom(Arc::new(predicates2));
397        let selection2 = Arc::new(RowGroupSelection::from_row_ids(
398            [1, 2, 3].into_iter().collect(),
399            100,
400            1,
401        ));
402        let weight2 =
403            IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2);
404        assert!(weight2 > 0);
405        assert_eq!(
406            weight2,
407            key2.mem_usage() as u32 + selection2.mem_usage() as u32
408        );
409
410        // Test 3: InvertedIndexKey
411        let mut predicates3 = BTreeMap::new();
412        let predicate3 = Predicate::Range(RangePredicate {
413            range: Range {
414                lower: None,
415                upper: None,
416            },
417        });
418        predicates3.insert(1, vec![predicate3]);
419        let key3 = PredicateKey::new_inverted(Arc::new(predicates3));
420        let selection3 = Arc::new(RowGroupSelection::from_row_ranges(
421            vec![(0, vec![5..15])],
422            20,
423        ));
424        let weight3 =
425            IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3);
426        assert!(weight3 > 0);
427        assert_eq!(
428            weight3,
429            key3.mem_usage() as u32 + selection3.mem_usage() as u32
430        );
431    }
432}