mito2/cache/
index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod bloom_filter_index;
16pub mod inverted_index;
17pub mod result_cache;
18#[cfg(feature = "vector_index")]
19pub mod vector_index;
20
21use std::future::Future;
22use std::hash::Hash;
23use std::ops::Range;
24use std::sync::Arc;
25
26use bytes::Bytes;
27use object_store::Buffer;
28
29use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
30
31/// Metrics for index metadata.
32const INDEX_METADATA_TYPE: &str = "index_metadata";
33/// Metrics for index content.
34const INDEX_CONTENT_TYPE: &str = "index_content";
35
36/// Metrics collected from IndexCache operations.
37#[derive(Debug, Default, Clone)]
38pub struct IndexCacheMetrics {
39    /// Number of cache hits.
40    pub cache_hit: usize,
41    /// Number of cache misses.
42    pub cache_miss: usize,
43    /// Number of pages accessed.
44    pub num_pages: usize,
45    /// Total bytes from pages.
46    pub page_bytes: u64,
47}
48
49impl IndexCacheMetrics {
50    /// Merges another set of metrics into this one.
51    pub fn merge(&mut self, other: &Self) {
52        self.cache_hit += other.cache_hit;
53        self.cache_miss += other.cache_miss;
54        self.num_pages += other.num_pages;
55        self.page_bytes += other.page_bytes;
56    }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60pub struct PageKey {
61    page_id: u64,
62}
63
64impl PageKey {
65    /// Converts an offset to a page ID based on the page size.
66    fn calculate_page_id(offset: u64, page_size: u64) -> u64 {
67        offset / page_size
68    }
69
70    /// Calculates the total number of pages that a given size spans, starting from a specific offset.
71    fn calculate_page_count(offset: u64, size: u32, page_size: u64) -> u32 {
72        let start_page = Self::calculate_page_id(offset, page_size);
73        let end_page = Self::calculate_page_id(offset + (size as u64) - 1, page_size);
74        (end_page + 1 - start_page) as u32
75    }
76
77    /// Calculates the byte range for data retrieval based on the specified offset and size.
78    ///
79    /// This function determines the starting and ending byte positions required for reading data.
80    /// For example, with an offset of 5000 and a size of 5000, using a PAGE_SIZE of 4096,
81    /// the resulting byte range will be 904..5904. This indicates that:
82    /// - The reader will first access fixed-size pages [4096, 8192) and [8192, 12288).
83    /// - To read the range [5000..10000), it only needs to fetch bytes within the range [904, 5904) across two pages.
84    fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
85        let start = (offset % page_size) as usize;
86        let end = start + size as usize;
87        start..end
88    }
89
90    /// Generates a iterator of `IndexKey` for the pages that a given offset and size span.
91    fn generate_page_keys(offset: u64, size: u32, page_size: u64) -> impl Iterator<Item = Self> {
92        let start_page = Self::calculate_page_id(offset, page_size);
93        let total_pages = Self::calculate_page_count(offset, size, page_size);
94        (0..total_pages).map(move |i| Self {
95            page_id: start_page + i as u64,
96        })
97    }
98}
99
100/// Cache for index metadata and content.
101pub struct IndexCache<K, M> {
102    /// Cache for index metadata
103    index_metadata: moka::sync::Cache<K, Arc<M>>,
104    /// Cache for index content.
105    index: moka::sync::Cache<(K, PageKey), Bytes>,
106    // Page size for index content.
107    page_size: u64,
108
109    /// Weighter for metadata.
110    weight_of_metadata: fn(&K, &Arc<M>) -> u32,
111    /// Weighter for content.
112    weight_of_content: fn(&(K, PageKey), &Bytes) -> u32,
113}
114
115impl<K, M> IndexCache<K, M>
116where
117    K: Hash + Eq + Send + Sync + 'static,
118    M: Send + Sync + 'static,
119{
120    pub fn new_with_weighter(
121        index_metadata_cap: u64,
122        index_content_cap: u64,
123        page_size: u64,
124        index_type: &'static str,
125        weight_of_metadata: fn(&K, &Arc<M>) -> u32,
126        weight_of_content: fn(&(K, PageKey), &Bytes) -> u32,
127    ) -> Self {
128        common_telemetry::debug!(
129            "Building IndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}, page size: {page_size}, index type: {index_type}"
130        );
131        let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap)
132            .name(&format!("index_metadata_{}", index_type))
133            .weigher(weight_of_metadata)
134            .eviction_listener(move |k, v, _cause| {
135                let size = weight_of_metadata(&k, &v);
136                CACHE_BYTES
137                    .with_label_values(&[INDEX_METADATA_TYPE])
138                    .sub(size.into());
139            })
140            .support_invalidation_closures()
141            .build();
142        let index_cache = moka::sync::CacheBuilder::new(index_content_cap)
143            .name(&format!("index_content_{}", index_type))
144            .weigher(weight_of_content)
145            .eviction_listener(move |k, v, _cause| {
146                let size = weight_of_content(&k, &v);
147                CACHE_BYTES
148                    .with_label_values(&[INDEX_CONTENT_TYPE])
149                    .sub(size.into());
150            })
151            .support_invalidation_closures()
152            .build();
153        Self {
154            index_metadata,
155            index: index_cache,
156            page_size,
157            weight_of_content,
158            weight_of_metadata,
159        }
160    }
161}
162
163impl<K, M> IndexCache<K, M>
164where
165    K: Hash + Eq + Clone + Copy + Send + Sync + 'static,
166    M: Send + Sync + 'static,
167{
168    pub fn get_metadata(&self, key: K) -> Option<Arc<M>> {
169        self.index_metadata.get(&key)
170    }
171
172    pub fn put_metadata(&self, key: K, metadata: Arc<M>) {
173        CACHE_BYTES
174            .with_label_values(&[INDEX_METADATA_TYPE])
175            .add((self.weight_of_metadata)(&key, &metadata).into());
176        self.index_metadata.insert(key, metadata)
177    }
178
179    /// Gets given range of index data from cache, and loads from source if the file
180    /// is not already cached.
181    async fn get_or_load<F, Fut, E>(
182        &self,
183        key: K,
184        file_size: u64,
185        offset: u64,
186        size: u32,
187        load: F,
188    ) -> Result<(Vec<u8>, IndexCacheMetrics), E>
189    where
190        F: Fn(Vec<Range<u64>>) -> Fut,
191        Fut: Future<Output = Result<Vec<Bytes>, E>>,
192        E: std::error::Error,
193    {
194        let mut metrics = IndexCacheMetrics::default();
195        let page_keys =
196            PageKey::generate_page_keys(offset, size, self.page_size).collect::<Vec<_>>();
197        // Size is 0, return empty data.
198        if page_keys.is_empty() {
199            return Ok((Vec::new(), metrics));
200        }
201        metrics.num_pages = page_keys.len();
202        let mut data = Vec::with_capacity(page_keys.len());
203        data.resize(page_keys.len(), Bytes::new());
204        let mut cache_miss_range = vec![];
205        let mut cache_miss_idx = vec![];
206        let last_index = page_keys.len() - 1;
207        // TODO: Avoid copy as much as possible.
208        for (i, page_key) in page_keys.iter().enumerate() {
209            match self.get_page(key, *page_key) {
210                Some(page) => {
211                    CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
212                    metrics.cache_hit += 1;
213                    metrics.page_bytes += page.len() as u64;
214                    data[i] = page;
215                }
216                None => {
217                    CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
218                    metrics.cache_miss += 1;
219                    let base_offset = page_key.page_id * self.page_size;
220                    let pruned_size = if i == last_index {
221                        prune_size(page_keys.iter(), file_size, self.page_size)
222                    } else {
223                        self.page_size
224                    };
225                    cache_miss_range.push(base_offset..base_offset + pruned_size);
226                    cache_miss_idx.push(i);
227                }
228            }
229        }
230        if !cache_miss_range.is_empty() {
231            let pages = load(cache_miss_range).await?;
232            for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
233                let page_key = page_keys[i];
234                metrics.page_bytes += page.len() as u64;
235                data[i] = page.clone();
236                self.put_page(key, page_key, page.clone());
237            }
238        }
239        let buffer = Buffer::from_iter(data.into_iter());
240        Ok((
241            buffer
242                .slice(PageKey::calculate_range(offset, size, self.page_size))
243                .to_vec(),
244            metrics,
245        ))
246    }
247
248    fn get_page(&self, key: K, page_key: PageKey) -> Option<Bytes> {
249        self.index.get(&(key, page_key))
250    }
251
252    fn put_page(&self, key: K, page_key: PageKey, value: Bytes) {
253        // Clones the value to ensure it doesn't reference a larger buffer.
254        let value = Bytes::from(value.to_vec());
255        CACHE_BYTES
256            .with_label_values(&[INDEX_CONTENT_TYPE])
257            .add((self.weight_of_content)(&(key, page_key), &value).into());
258        self.index.insert((key, page_key), value);
259    }
260
261    /// Invalidates all cache entries whose keys satisfy `predicate`.
262    pub fn invalidate_if<F>(&self, predicate: F)
263    where
264        F: Fn(&K) -> bool + Send + Sync + 'static,
265    {
266        let predicate = Arc::new(predicate);
267        let metadata_predicate = Arc::clone(&predicate);
268
269        self.index_metadata
270            .invalidate_entries_if(move |key, _| metadata_predicate(key))
271            .expect("cache should support invalidation closures");
272
273        self.index
274            .invalidate_entries_if(move |(key, _), _| predicate(key))
275            .expect("cache should support invalidation closures");
276    }
277}
278
279/// Prunes the size of the last page based on the indexes.
280/// We have following cases:
281/// 1. The rest file size is less than the page size, read to the end of the file.
282/// 2. Otherwise, read the page size.
283fn prune_size<'a>(
284    indexes: impl Iterator<Item = &'a PageKey>,
285    file_size: u64,
286    page_size: u64,
287) -> u64 {
288    let last_page_start = indexes.last().map(|i| i.page_id * page_size).unwrap_or(0);
289    page_size.min(file_size - last_page_start)
290}