mito2/cache/index/
result_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use index::bloom_filter::applier::InListPredicate;
19use index::inverted_index::search::predicate::{Predicate, RangePredicate};
20use moka::notification::RemovalCause;
21use moka::sync::Cache;
22use store_api::storage::ColumnId;
23
24use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
25use crate::sst::file::FileId;
26use crate::sst::index::fulltext_index::applier::builder::{
27    FulltextQuery, FulltextRequest, FulltextTerm,
28};
29use crate::sst::parquet::row_selection::RowGroupSelection;
30
31const INDEX_RESULT_TYPE: &str = "index_result";
32
33/// Cache for storing index query results.
34pub struct IndexResultCache {
35    cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
36}
37
38impl IndexResultCache {
39    /// Creates a new cache with the given capacity.
40    pub fn new(capacity: u64) -> Self {
41        fn to_str(cause: RemovalCause) -> &'static str {
42            match cause {
43                RemovalCause::Expired => "expired",
44                RemovalCause::Explicit => "explicit",
45                RemovalCause::Replaced => "replaced",
46                RemovalCause::Size => "size",
47            }
48        }
49
50        let cache = Cache::builder()
51            .max_capacity(capacity)
52            .weigher(Self::index_result_cache_weight)
53            .eviction_listener(|k, v, cause| {
54                let size = Self::index_result_cache_weight(&k, &v);
55                CACHE_BYTES
56                    .with_label_values(&[INDEX_RESULT_TYPE])
57                    .sub(size.into());
58                CACHE_EVICTION
59                    .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
60                    .inc();
61            })
62            .build();
63        Self { cache }
64    }
65
66    /// Puts a query result into the cache.
67    pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
68        let key = (key, file_id);
69        let size = Self::index_result_cache_weight(&key, &result);
70        CACHE_BYTES
71            .with_label_values(&[INDEX_RESULT_TYPE])
72            .add(size.into());
73        self.cache.insert(key, result);
74    }
75
76    /// Gets a query result from the cache.
77    pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
78        let res = self.cache.get(&(key.clone(), file_id));
79        if res.is_some() {
80            CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc();
81        } else {
82            CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc()
83        }
84        res
85    }
86
87    /// Calculates the memory usage of a cache entry.
88    fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
89        k.0.mem_usage() as u32 + v.mem_usage() as u32
90    }
91}
92
93/// Key for different types of index predicates.
94#[derive(Debug, Clone, PartialEq, Eq, Hash)]
95pub enum PredicateKey {
96    /// Fulltext index predicate.
97    Fulltext(FulltextIndexKey),
98    /// Bloom filter predicate.
99    Bloom(BloomFilterKey),
100    /// Inverted index predicate.
101    Inverted(InvertedIndexKey),
102}
103
104impl PredicateKey {
105    /// Creates a new fulltext index key.
106    pub fn new_fulltext(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
107        Self::Fulltext(FulltextIndexKey::new(predicates))
108    }
109
110    /// Creates a new bloom filter key.
111    pub fn new_bloom(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
112        Self::Bloom(BloomFilterKey::new(predicates))
113    }
114
115    /// Creates a new inverted index key.
116    pub fn new_inverted(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
117        Self::Inverted(InvertedIndexKey::new(predicates))
118    }
119
120    /// Returns the memory usage of this key.
121    pub fn mem_usage(&self) -> usize {
122        match self {
123            Self::Fulltext(key) => key.mem_usage,
124            Self::Bloom(key) => key.mem_usage,
125            Self::Inverted(key) => key.mem_usage,
126        }
127    }
128}
129
130/// Key for fulltext index queries.
131#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
132pub struct FulltextIndexKey {
133    predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>,
134    mem_usage: usize,
135}
136
137impl FulltextIndexKey {
138    /// Creates a new fulltext index key with the given predicates.
139    /// Calculates memory usage based on the size of queries and terms.
140    pub fn new(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
141        let mem_usage = predicates
142            .values()
143            .map(|request| {
144                let query_size = request
145                    .queries
146                    .iter()
147                    .map(|query| query.0.len() + size_of::<FulltextQuery>())
148                    .sum::<usize>();
149                let term_size = request
150                    .terms
151                    .iter()
152                    .map(|term| term.term.len() + size_of::<FulltextTerm>())
153                    .sum::<usize>();
154                query_size + term_size
155            })
156            .sum();
157        Self {
158            predicates,
159            mem_usage,
160        }
161    }
162}
163
164/// Key for bloom filter queries.
165#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
166pub struct BloomFilterKey {
167    predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
168    mem_usage: usize,
169}
170
171impl BloomFilterKey {
172    /// Creates a new bloom filter key with the given predicates.
173    /// Calculates memory usage based on the size of predicate lists.
174    pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
175        let mem_usage = predicates
176            .values()
177            .map(|predicates| {
178                predicates
179                    .iter()
180                    .map(|predicate| predicate.list.iter().map(|list| list.len()).sum::<usize>())
181                    .sum::<usize>()
182            })
183            .sum();
184        Self {
185            predicates,
186            mem_usage,
187        }
188    }
189}
190
191/// Key for inverted index queries.
192#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
193pub struct InvertedIndexKey {
194    predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>,
195    mem_usage: usize,
196}
197
198impl InvertedIndexKey {
199    /// Creates a new inverted index key with the given predicates.
200    /// Calculates memory usage based on the type and size of predicates.
201    pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
202        let mem_usage = predicates
203            .values()
204            .map(|predicates| {
205                predicates
206                    .iter()
207                    .map(|predicate| match predicate {
208                        Predicate::InList(predicate) => {
209                            predicate.list.iter().map(|list| list.len()).sum::<usize>()
210                        }
211                        Predicate::Range(_) => size_of::<RangePredicate>(),
212                        Predicate::RegexMatch(predicate) => predicate.pattern.len(),
213                    })
214                    .sum::<usize>()
215            })
216            .sum();
217
218        Self {
219            predicates,
220            mem_usage,
221        }
222    }
223}
224
225#[cfg(test)]
226#[allow(clippy::single_range_in_vec_init)]
227mod tests {
228    use std::collections::{BTreeMap, BTreeSet};
229    use std::sync::Arc;
230
231    use index::bloom_filter::applier::InListPredicate as BloomInListPredicate;
232    use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate};
233
234    use super::*;
235    use crate::sst::index::fulltext_index::applier::builder::{
236        FulltextQuery, FulltextRequest, FulltextTerm,
237    };
238    use crate::sst::parquet::row_selection::RowGroupSelection;
239
240    #[test]
241    fn test_cache_basic_operations() {
242        let cache = IndexResultCache::new(1000);
243        let file_id = FileId::random();
244
245        // Create a test key and value
246        let predicates = BTreeMap::new();
247        let key = PredicateKey::new_fulltext(Arc::new(predicates));
248        let selection = Arc::new(RowGroupSelection::from_row_ids(
249            [1, 2, 3].into_iter().collect(),
250            1,
251            10,
252        ));
253
254        // Test put and get
255        cache.put(key.clone(), file_id, selection.clone());
256        let retrieved = cache.get(&key, file_id);
257        assert!(retrieved.is_some());
258        assert_eq!(
259            retrieved.unwrap().as_ref().row_count(),
260            selection.as_ref().row_count()
261        );
262
263        // Test get non-existent key
264        let non_existent_file_id = FileId::random();
265        assert!(cache.get(&key, non_existent_file_id).is_none());
266    }
267
268    #[test]
269    fn test_cache_capacity_limit() {
270        // Create a cache with small capacity (100 bytes)
271        let cache = IndexResultCache::new(100);
272        let file_id1 = FileId::random();
273        let file_id2 = FileId::random();
274
275        // Create two large keys that will exceed capacity
276        let mut predicates1 = BTreeMap::new();
277        let request1 = FulltextRequest {
278            queries: vec![
279                FulltextQuery(
280                    "test query 1 with a very long string to ensure large weight".to_string(),
281                ),
282                FulltextQuery("another long query string".to_string()),
283            ],
284            terms: vec![],
285        };
286        predicates1.insert(1, request1);
287        let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
288        let selection1 = Arc::new(RowGroupSelection::default());
289
290        let mut predicates2 = BTreeMap::new();
291        let request2 = FulltextRequest {
292            queries: vec![
293                FulltextQuery(
294                    "test query 2 with a very long string to ensure large weight".to_string(),
295                ),
296                FulltextQuery("another long query string".to_string()),
297            ],
298            terms: vec![],
299        };
300        predicates2.insert(1, request2);
301        let key2 = PredicateKey::new_fulltext(Arc::new(predicates2));
302        let selection2 = Arc::new(RowGroupSelection::default());
303
304        // Calculate weights
305        let weight1 =
306            IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1);
307        let weight2 =
308            IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2);
309        assert!(weight1 > 100);
310        assert!(weight2 > 100);
311
312        // Put first key-value pair
313        cache.put(key1.clone(), file_id1, selection1.clone());
314
315        // Verify first key is in cache
316        let retrieved1 = cache.get(&key1, file_id1);
317        assert!(retrieved1.is_some());
318        assert_eq!(
319            retrieved1.unwrap().as_ref().row_count(),
320            selection1.as_ref().row_count()
321        );
322
323        // Put second key-value pair, which should trigger eviction
324        cache.put(key2.clone(), file_id2, selection2.clone());
325
326        // Verify second key is in cache
327        let retrieved2 = cache.get(&key2, file_id2);
328        assert!(retrieved2.is_some());
329        assert_eq!(
330            retrieved2.unwrap().as_ref().row_count(),
331            selection2.as_ref().row_count()
332        );
333
334        // Verify first key was evicted
335        cache.cache.run_pending_tasks();
336        let retrieved1_after_eviction = cache.get(&key1, file_id1);
337        assert!(
338            retrieved1_after_eviction.is_none(),
339            "First key should have been evicted"
340        );
341    }
342
343    #[test]
344    fn test_index_result_cache_weight() {
345        let file_id = FileId::random();
346
347        // Test empty values
348        let empty_predicates = BTreeMap::new();
349        let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates));
350        let empty_selection = Arc::new(RowGroupSelection::default());
351        let empty_weight = IndexResultCache::index_result_cache_weight(
352            &(empty_key.clone(), file_id),
353            &empty_selection,
354        );
355        assert_eq!(empty_weight, 0);
356        assert_eq!(
357            empty_weight,
358            empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32
359        );
360
361        // Test 1: FulltextIndexKey
362        let mut predicates1 = BTreeMap::new();
363        let request1 = FulltextRequest {
364            queries: vec![FulltextQuery("test query".to_string())],
365            terms: vec![FulltextTerm {
366                col_lowered: false,
367                term: "test term".to_string(),
368            }],
369        };
370        predicates1.insert(1, request1);
371        let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
372        let selection1 = Arc::new(RowGroupSelection::new(100, 250));
373        let weight1 =
374            IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1);
375        assert!(weight1 > 0);
376        assert_eq!(
377            weight1,
378            key1.mem_usage() as u32 + selection1.mem_usage() as u32
379        );
380
381        // Test 2: BloomFilterKey
382        let mut predicates2 = BTreeMap::new();
383        let predicate2 = BloomInListPredicate {
384            list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]),
385        };
386        predicates2.insert(1, vec![predicate2]);
387        let key2 = PredicateKey::new_bloom(Arc::new(predicates2));
388        let selection2 = Arc::new(RowGroupSelection::from_row_ids(
389            [1, 2, 3].into_iter().collect(),
390            100,
391            1,
392        ));
393        let weight2 =
394            IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2);
395        assert!(weight2 > 0);
396        assert_eq!(
397            weight2,
398            key2.mem_usage() as u32 + selection2.mem_usage() as u32
399        );
400
401        // Test 3: InvertedIndexKey
402        let mut predicates3 = BTreeMap::new();
403        let predicate3 = Predicate::Range(RangePredicate {
404            range: Range {
405                lower: None,
406                upper: None,
407            },
408        });
409        predicates3.insert(1, vec![predicate3]);
410        let key3 = PredicateKey::new_inverted(Arc::new(predicates3));
411        let selection3 = Arc::new(RowGroupSelection::from_row_ranges(
412            vec![(0, vec![5..15])],
413            20,
414        ));
415        let weight3 =
416            IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3);
417        assert!(weight3 > 0);
418        assert_eq!(
419            weight3,
420            key3.mem_usage() as u32 + selection3.mem_usage() as u32
421        );
422    }
423}